This is an automated email from the ASF dual-hosted git repository. fanng pushed a commit to branch stats_job in repository https://gitbox.apache.org/repos/asf/gravitino.git
commit adb57bd3e18d510d390b035b7fbf25733e1cfaf9 Author: fanng <[email protected]> AuthorDate: Wed Mar 4 13:09:41 2026 +0800 [#12688] Refine iceberg update stats job modes and options --- maintenance/jobs/build.gradle.kts | 5 + .../jobs/iceberg/IcebergUpdateStatsJob.java | 286 ++++++++++++------- .../jobs/iceberg/TestIcebergUpdateStatsJob.java | 97 +++++-- .../TestIcebergUpdateStatsJobWithSpark.java | 310 ++++++++++++++++++++- .../job/TestBuiltinIcebergRewriteDataFiles.java | 11 +- .../job/TestBuiltinIcebergUpdateStatsJob.java | 45 ++- 6 files changed, 611 insertions(+), 143 deletions(-) diff --git a/maintenance/jobs/build.gradle.kts b/maintenance/jobs/build.gradle.kts index e88f3ebbef..5b42074c32 100644 --- a/maintenance/jobs/build.gradle.kts +++ b/maintenance/jobs/build.gradle.kts @@ -68,6 +68,11 @@ dependencies { exclude("javax.servlet") } testImplementation(libs.junit.jupiter.api) + testImplementation(libs.awaitility) + testImplementation(libs.testcontainers) + testImplementation(libs.testcontainers.junit.jupiter) + testImplementation(libs.testcontainers.mysql) + testRuntimeOnly(libs.mysql.driver) testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion") { exclude("org.slf4j") exclude("org.apache.logging.log4j") diff --git a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java index e2689825c3..40b1279bab 100644 --- a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergUpdateStatsJob.java @@ -26,7 +26,10 @@ import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; +import java.util.Objects; +import java.util.Optional; import org.apache.gravitino.NameIdentifier; import org.apache.gravitino.job.JobTemplateProvider; import org.apache.gravitino.job.SparkJobTemplate; @@ -61,6 +64,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob { private static final String DEFAULT_STATISTICS_UPDATER = "gravitino-statistics-updater"; private static final String DEFAULT_METRICS_UPDATER = "gravitino-metrics-updater"; private static final long DEFAULT_TARGET_FILE_SIZE_BYTES = 100_000L; + private static final String DEFAULT_UPDATE_MODE = UpdateMode.ALL.modeName; private static final String CUSTOM_STAT_PREFIX = "custom-"; @Override @@ -83,25 +87,16 @@ public class IcebergUpdateStatsJob implements BuiltInJob { Map<String, String> argMap = parseArguments(args); String catalogName = argMap.get("catalog"); String tableIdentifier = argMap.get("table"); - String gravitinoUri = argMap.get("gravitino-uri"); - String metalake = argMap.get("metalake"); - - if (catalogName == null - || tableIdentifier == null - || gravitinoUri == null - || metalake == null) { - System.err.println( - "Error: --catalog, --table, --gravitino-uri and --metalake are required arguments"); + UpdateMode updateMode = parseUpdateMode(argMap.get("update-mode")); + + if (catalogName == null || tableIdentifier == null) { + System.err.println("Error: --catalog and --table are required arguments"); printUsage(); System.exit(1); } - String updaterName = - argMap.getOrDefault("statistics-updater", DEFAULT_STATISTICS_UPDATER).trim(); - boolean enableMetrics = parseEnableMetrics(argMap.get("enable-metrics")); - String metricsUpdaterName = - argMap.getOrDefault("metrics-updater", DEFAULT_METRICS_UPDATER).trim(); long targetFileSizeBytes = parseTargetFileSize(argMap.get("target-file-size-bytes")); + Map<String, String> updaterOptions = parseJsonOptions(argMap.get("updater-options")); String sparkConfJson = argMap.get("spark-conf"); SparkSession.Builder sparkBuilder = @@ -118,19 +113,30 @@ public class IcebergUpdateStatsJob implements BuiltInJob { StatisticsUpdater statisticsUpdater = null; MetricsUpdater metricsUpdater = null; try { - statisticsUpdater = createStatisticsUpdater(updaterName, gravitinoUri, metalake); - if (enableMetrics) { - metricsUpdater = createMetricsUpdater(metricsUpdaterName, gravitinoUri, metalake); + Map<String, String> optimizerProperties = buildOptimizerProperties(updaterOptions); + if (updateMode.updateStats) { + String statisticsUpdaterName = + updaterOptions.getOrDefault("statistics_updater", DEFAULT_STATISTICS_UPDATER).trim(); + statisticsUpdater = + createStatisticsUpdater( + statisticsUpdaterName, requireGravitinoConfig(optimizerProperties)); + } + if (updateMode.updateMetrics) { + String metricsUpdaterName = + updaterOptions.getOrDefault("metrics_updater", DEFAULT_METRICS_UPDATER).trim(); + metricsUpdater = createMetricsUpdater(metricsUpdaterName, optimizerProperties); } + updateStatistics( spark, statisticsUpdater, metricsUpdater, + updateMode, catalogName, tableIdentifier, targetFileSizeBytes); } catch (Exception e) { - LOG.error("Failed to update Iceberg statistics", e); + LOG.error("Failed to update Iceberg statistics/metrics", e); System.exit(1); } finally { if (statisticsUpdater != null) { @@ -158,16 +164,52 @@ public class IcebergUpdateStatsJob implements BuiltInJob { String tableIdentifier, long targetFileSizeBytes) { updateStatistics( - spark, statisticsUpdater, null, catalogName, tableIdentifier, targetFileSizeBytes); + spark, + statisticsUpdater, + null, + UpdateMode.STATS, + catalogName, + tableIdentifier, + targetFileSizeBytes); + } + + static void updateStatistics( + SparkSession spark, + StatisticsUpdater statisticsUpdater, + MetricsUpdater metricsUpdater, + String catalogName, + String tableIdentifier, + long targetFileSizeBytes) { + updateStatistics( + spark, + statisticsUpdater, + metricsUpdater, + UpdateMode.ALL, + catalogName, + tableIdentifier, + targetFileSizeBytes); } static void updateStatistics( SparkSession spark, StatisticsUpdater statisticsUpdater, MetricsUpdater metricsUpdater, + UpdateMode updateMode, String catalogName, String tableIdentifier, long targetFileSizeBytes) { + Objects.requireNonNull(updateMode, "updateMode must not be null"); + + if (updateMode.updateStats && statisticsUpdater == null) { + throw new IllegalArgumentException( + "Statistics updater must be configured when update_mode is stats or all"); + } + + if (updateMode.updateMetrics && metricsUpdater == null) { + throw new IllegalArgumentException( + "Metrics updater must be configured when update_mode is metrics or all"); + } + NameIdentifier gravitinoTableIdentifier = toGravitinoTableIdentifier(catalogName, tableIdentifier); long metricTimestamp = System.currentTimeMillis() / 1000L; @@ -180,33 +222,47 @@ public class IcebergUpdateStatsJob implements BuiltInJob { for (Row row : rows) { PartitionPath partitionPath = toPartitionPath(row.getAs("partition")); List<StatisticEntry<?>> statistics = toStatistics(row); - partitionStatistics.put(partitionPath, statistics); - if (metricsUpdater != null) { + if (updateMode.updateStats) { + partitionStatistics.put(partitionPath, statistics); + } + if (updateMode.updateMetrics) { tableAndPartitionMetrics.addAll( toPartitionMetricPoints( gravitinoTableIdentifier, partitionPath, statistics, metricTimestamp)); } } - statisticsUpdater.updatePartitionStatistics(gravitinoTableIdentifier, partitionStatistics); - if (metricsUpdater != null && !tableAndPartitionMetrics.isEmpty()) { + + if (updateMode.updateStats) { + statisticsUpdater.updatePartitionStatistics(gravitinoTableIdentifier, partitionStatistics); + } + + if (updateMode.updateMetrics && !tableAndPartitionMetrics.isEmpty()) { metricsUpdater.updateTableAndPartitionMetrics(tableAndPartitionMetrics); } + LOG.info( - "Updated partition statistics for {} partitions on {}", - partitionStatistics.size(), + "Updated partition data in mode {} for {} partitions on {}", + updateMode.modeName, + rows.length, gravitinoTableIdentifier); } else { String sql = buildTableStatsSql(catalogName, tableIdentifier, targetFileSizeBytes); Row[] rows = (Row[]) spark.sql(sql).collect(); List<StatisticEntry<?>> tableStatistics = rows.length == 0 ? List.of() : toStatistics(rows[0]); - statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, tableStatistics); - if (metricsUpdater != null && !tableStatistics.isEmpty()) { + + if (updateMode.updateStats) { + statisticsUpdater.updateTableStatistics(gravitinoTableIdentifier, tableStatistics); + } + + if (updateMode.updateMetrics && !tableStatistics.isEmpty()) { metricsUpdater.updateTableAndPartitionMetrics( toTableMetricPoints(gravitinoTableIdentifier, tableStatistics, metricTimestamp)); } + LOG.info( - "Updated table statistics with {} metrics on {}", + "Updated table data in mode {} with {} metrics on {}", + updateMode.modeName, tableStatistics.size(), gravitinoTableIdentifier); } @@ -338,13 +394,17 @@ public class IcebergUpdateStatsJob implements BuiltInJob { } static Map<String, String> parseCustomSparkConfigs(String sparkConfJson) { - if (sparkConfJson == null || sparkConfJson.isEmpty()) { + return parseJsonOptions(sparkConfJson); + } + + static Map<String, String> parseJsonOptions(String json) { + if (json == null || json.isEmpty()) { return new HashMap<>(); } try { ObjectMapper mapper = new ObjectMapper(); Map<String, Object> parsedMap = - mapper.readValue(sparkConfJson, new TypeReference<Map<String, Object>>() {}); + mapper.readValue(json, new TypeReference<Map<String, Object>>() {}); Map<String, String> configs = new HashMap<>(); for (Map.Entry<String, Object> entry : parsedMap.entrySet()) { configs.put(entry.getKey(), entry.getValue() == null ? "" : entry.getValue().toString()); @@ -352,11 +412,7 @@ public class IcebergUpdateStatsJob implements BuiltInJob { return configs; } catch (Exception e) { throw new IllegalArgumentException( - "Failed to parse Spark configurations JSON: " - + sparkConfJson - + ". Error: " - + e.getMessage(), - e); + "Failed to parse JSON options: " + json + ". Error: " + e.getMessage(), e); } } @@ -375,37 +431,59 @@ public class IcebergUpdateStatsJob implements BuiltInJob { } } - static boolean parseEnableMetrics(String value) { + static UpdateMode parseUpdateMode(String value) { if (value == null || value.trim().isEmpty()) { - return false; + return UpdateMode.from(DEFAULT_UPDATE_MODE); } - if ("true".equalsIgnoreCase(value.trim())) { - return true; + return UpdateMode.from(value); + } + + static Map<String, String> buildOptimizerProperties(Map<String, String> updaterOptions) { + Map<String, String> optimizerProperties = new HashMap<>(updaterOptions); + + Optional<String> gravitinoUri = + firstNonEmpty( + updaterOptions.get("gravitino_uri"), + updaterOptions.get("gravitino-uri"), + updaterOptions.get(OptimizerConfig.GRAVITINO_URI)); + Optional<String> metalake = + firstNonEmpty( + updaterOptions.get("metalake"), updaterOptions.get(OptimizerConfig.GRAVITINO_METALAKE)); + + gravitinoUri.ifPresent(uri -> optimizerProperties.put(OptimizerConfig.GRAVITINO_URI, uri)); + metalake.ifPresent(value -> optimizerProperties.put(OptimizerConfig.GRAVITINO_METALAKE, value)); + return optimizerProperties; + } + + static Map<String, String> requireGravitinoConfig(Map<String, String> optimizerProperties) { + String gravitinoUri = optimizerProperties.get(OptimizerConfig.GRAVITINO_URI); + String metalake = optimizerProperties.get(OptimizerConfig.GRAVITINO_METALAKE); + + if (gravitinoUri == null || gravitinoUri.trim().isEmpty()) { + throw new IllegalArgumentException( + "updater_options must contain 'gravitino_uri' when update_mode is stats or all"); } - if ("false".equalsIgnoreCase(value.trim())) { - return false; + + if (metalake == null || metalake.trim().isEmpty()) { + throw new IllegalArgumentException( + "updater_options must contain 'metalake' when update_mode is stats or all"); } - throw new IllegalArgumentException("Invalid enable-metrics value: " + value); + + return optimizerProperties; } private static StatisticsUpdater createStatisticsUpdater( - String updaterName, String gravitinoUri, String metalake) { + String updaterName, Map<String, String> optimizerProperties) { StatisticsUpdater statisticsUpdater = ProviderUtils.createStatisticsUpdaterInstance(updaterName); - Map<String, String> conf = new HashMap<>(); - conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri); - conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake); - statisticsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf))); + statisticsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(optimizerProperties))); return statisticsUpdater; } private static MetricsUpdater createMetricsUpdater( - String updaterName, String gravitinoUri, String metalake) { + String updaterName, Map<String, String> optimizerProperties) { MetricsUpdater metricsUpdater = ProviderUtils.createMetricsUpdaterInstance(updaterName); - Map<String, String> conf = new HashMap<>(); - conf.put(OptimizerConfig.GRAVITINO_URI, gravitinoUri); - conf.put(OptimizerConfig.GRAVITINO_METALAKE, metalake); - metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf))); + metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(optimizerProperties))); return metricsUpdater; } @@ -478,65 +556,83 @@ public class IcebergUpdateStatsJob implements BuiltInJob { return number == null ? 0D : number.doubleValue(); } + private static Optional<String> firstNonEmpty(String... candidates) { + for (String candidate : candidates) { + if (candidate != null && !candidate.trim().isEmpty()) { + return Optional.of(candidate.trim()); + } + } + return Optional.empty(); + } + private static List<String> buildArguments() { return Arrays.asList( "--catalog", "{{catalog_name}}", "--table", "{{table_identifier}}", - "--gravitino-uri", - "{{gravitino_uri}}", - "--metalake", - "{{metalake}}", + "--update-mode", + "{{update_mode}}", "--target-file-size-bytes", "{{target_file_size_bytes}}", - "--statistics-updater", - "{{statistics_updater}}", - "--enable-metrics", - "{{enable_metrics}}", - "--metrics-updater", - "{{metrics_updater}}", + "--updater-options", + "{{updater_options}}", "--spark-conf", "{{spark_conf}}"); } private static Map<String, String> buildSparkConfigs() { - Map<String, String> configs = new HashMap<>(); - configs.put("spark.master", "{{spark_master}}"); - configs.put("spark.executor.instances", "{{spark_executor_instances}}"); - configs.put("spark.executor.cores", "{{spark_executor_cores}}"); - configs.put("spark.executor.memory", "{{spark_executor_memory}}"); - configs.put("spark.driver.memory", "{{spark_driver_memory}}"); - configs.put("spark.sql.catalog.{{catalog_name}}", "org.apache.iceberg.spark.SparkCatalog"); - configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}"); - configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}"); - configs.put("spark.sql.catalog.{{catalog_name}}.warehouse", "{{warehouse_location}}"); - configs.put( - "spark.sql.extensions", - "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); - return Collections.unmodifiableMap(configs); + return Collections.emptyMap(); } private static void printUsage() { System.err.println( - "Usage: IcebergUpdateStatsJob [OPTIONS]\n" - + "\n" - + "Required Options:\n" - + " --catalog <name> Iceberg catalog name registered in Spark\n" - + " --table <identifier> Table name in schema.table format\n" - + " --gravitino-uri <uri> Gravitino server URI\n" - + " --metalake <metalake_name> Gravitino metalake name\n" - + "\n" - + "Optional Options:\n" - + " --target-file-size-bytes <bytes> Small-file threshold and MSE target\n" - + " Default: 100000\n" - + " --statistics-updater <name> StatisticsUpdater provider name\n" - + " Default: gravitino-statistics-updater\n" - + " --enable-metrics <true|false> Whether to persist metrics via MetricsUpdater\n" - + " Default: false\n" - + " --metrics-updater <name> MetricsUpdater provider name\n" - + " Default: gravitino-metrics-updater\n" - + " --spark-conf <json> JSON map of custom Spark configs\n" - + " Example: '{\"spark.sql.shuffle.partitions\":\"200\"}'"); + "Usage: IcebergUpdateStatsJob [OPTIONS]\\n" + + "\\n" + + "Required Options:\\n" + + " --catalog <name> Iceberg catalog name registered in Spark\\n" + + " --table <identifier> Table name in schema.table format\\n" + + "\\n" + + "Optional Options:\\n" + + " --update-mode <stats|metrics|all> Update behavior mode, default: all\\n" + + " --target-file-size-bytes <bytes> Small-file threshold and MSE target\\n" + + " Default: 100000\\n" + + " --updater-options <json> JSON map for updater and repository settings\\n" + + " Example: '{\"gravitino_uri\":\"http://localhost:8090\",\\n" + + " \"metalake\":\"test\",\"statistics_updater\":\"gravitino-statistics-updater\",\\n" + + " \"metrics_updater\":\"gravitino-metrics-updater\"}'\\n" + + " --spark-conf <json> JSON map of custom Spark configs\\n" + + " Must include Iceberg catalog configs for --catalog\\n" + + " Example: '{\"spark.master\":\"local[2]\"," + + "\"spark.sql.catalog.rest_catalog\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog.rest_catalog.type\":\"rest\"," + + "\"spark.sql.catalog.rest_catalog.uri\":\"http://localhost:9001/iceberg\"}'"); + } + + enum UpdateMode { + STATS("stats", true, false), + METRICS("metrics", false, true), + ALL("all", true, true); + + private final String modeName; + private final boolean updateStats; + private final boolean updateMetrics; + + UpdateMode(String modeName, boolean updateStats, boolean updateMetrics) { + this.modeName = modeName; + this.updateStats = updateStats; + this.updateMetrics = updateMetrics; + } + + static UpdateMode from(String value) { + String normalized = value.trim().toLowerCase(Locale.ROOT); + for (UpdateMode mode : values()) { + if (mode.modeName.equals(normalized)) { + return mode; + } + } + throw new IllegalArgumentException( + "Invalid update_mode value: " + value + ". Supported values are: stats, metrics, all"); + } } } diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java index b035ed4b74..636b93143c 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJob.java @@ -19,7 +19,6 @@ package org.apache.gravitino.maintenance.jobs.iceberg; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -27,6 +26,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Map; import org.apache.gravitino.job.JobTemplateProvider; import org.apache.gravitino.job.SparkJobTemplate; +import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; import org.junit.jupiter.api.Test; public class TestIcebergUpdateStatsJob { @@ -48,23 +48,19 @@ public class TestIcebergUpdateStatsJob { SparkJobTemplate template = job.jobTemplate(); assertNotNull(template.arguments()); - assertEquals(18, template.arguments().size()); + assertEquals(12, template.arguments().size()); assertTrue(template.arguments().contains("--catalog")); assertTrue(template.arguments().contains("{{catalog_name}}")); assertTrue(template.arguments().contains("--table")); assertTrue(template.arguments().contains("{{table_identifier}}")); - assertTrue(template.arguments().contains("--gravitino-uri")); - assertTrue(template.arguments().contains("{{gravitino_uri}}")); - assertTrue(template.arguments().contains("--metalake")); - assertTrue(template.arguments().contains("{{metalake}}")); + assertTrue(template.arguments().contains("--update-mode")); + assertTrue(template.arguments().contains("{{update_mode}}")); assertTrue(template.arguments().contains("--target-file-size-bytes")); assertTrue(template.arguments().contains("{{target_file_size_bytes}}")); - assertTrue(template.arguments().contains("--statistics-updater")); - assertTrue(template.arguments().contains("{{statistics_updater}}")); - assertTrue(template.arguments().contains("--enable-metrics")); - assertTrue(template.arguments().contains("{{enable_metrics}}")); - assertTrue(template.arguments().contains("--metrics-updater")); - assertTrue(template.arguments().contains("{{metrics_updater}}")); + assertTrue(template.arguments().contains("--updater-options")); + assertTrue(template.arguments().contains("{{updater_options}}")); + assertTrue(template.arguments().contains("--spark-conf")); + assertTrue(template.arguments().contains("{{spark_conf}}")); } @Test @@ -72,21 +68,21 @@ public class TestIcebergUpdateStatsJob { String[] args = { "--catalog", "cat", "--table", "db.tbl", - "--gravitino-uri", "http://localhost:8090", - "--metalake", "ml", + "--update-mode", "metrics", "--target-file-size-bytes", "2048", - "--enable-metrics", "true", - "--metrics-updater", "gravitino-metrics-updater" + "--updater-options", "{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}", + "--spark-conf", "{\"spark.master\":\"local[2]\"}" }; Map<String, String> parsed = IcebergUpdateStatsJob.parseArguments(args); assertEquals("cat", parsed.get("catalog")); assertEquals("db.tbl", parsed.get("table")); - assertEquals("http://localhost:8090", parsed.get("gravitino-uri")); - assertEquals("ml", parsed.get("metalake")); + assertEquals("metrics", parsed.get("update-mode")); assertEquals("2048", parsed.get("target-file-size-bytes")); - assertEquals("true", parsed.get("enable-metrics")); - assertEquals("gravitino-metrics-updater", parsed.get("metrics-updater")); + assertEquals( + "{\"metalake\":\"ml\",\"gravitino_uri\":\"http://localhost:8090\"}", + parsed.get("updater-options")); + assertEquals("{\"spark.master\":\"local[2]\"}", parsed.get("spark-conf")); } @Test @@ -95,7 +91,6 @@ public class TestIcebergUpdateStatsJob { String partitionSql = IcebergUpdateStatsJob.buildPartitionStatsSql("cat", "db.tbl", 100000L); assertTrue(tableSql.contains("FROM cat.db.tbl.files")); - assertFalse(tableSql.contains("GROUP BY partition")); assertTrue(tableSql.contains("AS datafile_mse")); assertTrue(partitionSql.contains("FROM cat.db.tbl.files")); assertTrue(partitionSql.contains("GROUP BY partition")); @@ -114,13 +109,59 @@ public class TestIcebergUpdateStatsJob { } @Test - public void testParseEnableMetrics() { - assertFalse(IcebergUpdateStatsJob.parseEnableMetrics(null)); - assertFalse(IcebergUpdateStatsJob.parseEnableMetrics("")); - assertFalse(IcebergUpdateStatsJob.parseEnableMetrics("false")); - assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("true")); - assertTrue(IcebergUpdateStatsJob.parseEnableMetrics("TRUE")); + public void testParseUpdateMode() { + assertEquals(IcebergUpdateStatsJob.UpdateMode.ALL, IcebergUpdateStatsJob.parseUpdateMode(null)); + assertEquals(IcebergUpdateStatsJob.UpdateMode.ALL, IcebergUpdateStatsJob.parseUpdateMode("")); + assertEquals( + IcebergUpdateStatsJob.UpdateMode.STATS, IcebergUpdateStatsJob.parseUpdateMode("stats")); + assertEquals( + IcebergUpdateStatsJob.UpdateMode.METRICS, IcebergUpdateStatsJob.parseUpdateMode("metrics")); + assertEquals( + IcebergUpdateStatsJob.UpdateMode.ALL, IcebergUpdateStatsJob.parseUpdateMode("all")); assertThrows( - IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseEnableMetrics("yes")); + IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseUpdateMode("invalid")); + } + + @Test + public void testParseJsonOptions() { + Map<String, String> parsed = + IcebergUpdateStatsJob.parseJsonOptions("{\"a\":\"b\",\"x\":1,\"flag\":true,\"nil\":null}"); + assertEquals("b", parsed.get("a")); + assertEquals("1", parsed.get("x")); + assertEquals("true", parsed.get("flag")); + assertEquals("", parsed.get("nil")); + assertThrows( + IllegalArgumentException.class, () -> IcebergUpdateStatsJob.parseJsonOptions("{not_json}")); + } + + @Test + public void testBuildOptimizerProperties() { + Map<String, String> options = + Map.of( + "gravitino_uri", "http://localhost:8090", + "metalake", "ml", + "gravitino.optimizer.jdbcMetrics.jdbcUrl", "jdbc:mysql://localhost:3306/metrics"); + Map<String, String> optimizerProperties = + IcebergUpdateStatsJob.buildOptimizerProperties(options); + + assertEquals("http://localhost:8090", optimizerProperties.get(OptimizerConfig.GRAVITINO_URI)); + assertEquals("ml", optimizerProperties.get(OptimizerConfig.GRAVITINO_METALAKE)); + assertEquals( + "jdbc:mysql://localhost:3306/metrics", + optimizerProperties.get("gravitino.optimizer.jdbcMetrics.jdbcUrl")); + } + + @Test + public void testRequireGravitinoConfig() { + Map<String, String> optimizerProperties = + Map.of( + OptimizerConfig.GRAVITINO_URI, "http://localhost:8090", + OptimizerConfig.GRAVITINO_METALAKE, "ml"); + assertEquals( + optimizerProperties, IcebergUpdateStatsJob.requireGravitinoConfig(optimizerProperties)); + + assertThrows( + IllegalArgumentException.class, + () -> IcebergUpdateStatsJob.requireGravitinoConfig(Map.of())); } } diff --git a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java index 8f51256ee0..f7e9b9e097 100644 --- a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java +++ b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergUpdateStatsJobWithSpark.java @@ -24,11 +24,25 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.Statement; +import java.time.Duration; +import java.time.Instant; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.gravitino.Catalog; import org.apache.gravitino.NameIdentifier; +import org.apache.gravitino.client.GravitinoAdminClient; +import org.apache.gravitino.client.GravitinoClient; +import org.apache.gravitino.client.GravitinoMetalake; +import org.apache.gravitino.exceptions.NoSuchMetalakeException; +import org.apache.gravitino.job.JobHandle; import org.apache.gravitino.maintenance.optimizer.api.common.DataScope; import org.apache.gravitino.maintenance.optimizer.api.common.MetricPoint; import org.apache.gravitino.maintenance.optimizer.api.common.PartitionPath; @@ -36,15 +50,31 @@ import org.apache.gravitino.maintenance.optimizer.api.common.StatisticEntry; import org.apache.gravitino.maintenance.optimizer.api.updater.MetricsUpdater; import org.apache.gravitino.maintenance.optimizer.api.updater.StatisticsUpdater; import org.apache.gravitino.maintenance.optimizer.common.OptimizerEnv; +import org.apache.gravitino.maintenance.optimizer.common.PartitionEntryImpl; +import org.apache.gravitino.maintenance.optimizer.common.conf.OptimizerConfig; +import org.apache.gravitino.maintenance.optimizer.updater.metrics.GravitinoMetricsUpdater; +import org.apache.gravitino.maintenance.optimizer.updater.metrics.storage.jdbc.GenericJdbcMetricsRepository; +import org.apache.gravitino.rel.Table; +import org.apache.gravitino.stats.Statistic; import org.apache.spark.sql.SparkSession; +import org.awaitility.Awaitility; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable; import org.junit.jupiter.api.io.TempDir; +import org.testcontainers.containers.MySQLContainer; /** Integration tests for IcebergUpdateStatsJob with a real Spark+Iceberg runtime. */ public class TestIcebergUpdateStatsJobWithSpark { + private static final String SERVER_URI = "http://localhost:8090"; + private static final String ICEBERG_REST_URI = "http://localhost:9001/iceberg"; + private static final String JOB_TEMPLATE_NAME = "builtin-iceberg-update-stats"; + private static final String SPARK_CATALOG_NAME = "rest_catalog"; + private static final String METALAKE_NAME = "test"; + @TempDir static File tempDir; private static SparkSession spark; @@ -116,7 +146,13 @@ public class TestIcebergUpdateStatsJobWithSpark { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); IcebergUpdateStatsJob.updateStatistics( - spark, updater, catalogName, "db.non_partitioned", 100_000L); + spark, + updater, + null, + IcebergUpdateStatsJob.UpdateMode.STATS, + catalogName, + "db.non_partitioned", + 100_000L); assertEquals(NameIdentifier.of(catalogName, "db", "non_partitioned"), updater.tableIdentifier); assertNotNull(updater.tableStatistics); @@ -135,7 +171,14 @@ public class TestIcebergUpdateStatsJobWithSpark { public void testUpdatePartitionedTableStatistics() { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); - IcebergUpdateStatsJob.updateStatistics(spark, updater, catalogName, "db.partitioned", 100_000L); + IcebergUpdateStatsJob.updateStatistics( + spark, + updater, + null, + IcebergUpdateStatsJob.UpdateMode.STATS, + catalogName, + "db.partitioned", + 100_000L); assertEquals(NameIdentifier.of(catalogName, "db", "partitioned"), updater.tableIdentifier); assertTrue(updater.tableStatistics.isEmpty()); @@ -155,12 +198,18 @@ public class TestIcebergUpdateStatsJobWithSpark { } @Test - public void testUpdatePartitionedTableStatisticsWithMetrics() { + public void testUpdatePartitionedTableStatisticsAndMetrics() { RecordingStatisticsUpdater statisticsUpdater = new RecordingStatisticsUpdater(); RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); IcebergUpdateStatsJob.updateStatistics( - spark, statisticsUpdater, metricsUpdater, catalogName, "db.partitioned", 100_000L); + spark, + statisticsUpdater, + metricsUpdater, + IcebergUpdateStatsJob.UpdateMode.ALL, + catalogName, + "db.partitioned", + 100_000L); assertEquals( NameIdentifier.of(catalogName, "db", "partitioned"), statisticsUpdater.tableIdentifier); @@ -180,12 +229,117 @@ public class TestIcebergUpdateStatsJobWithSpark { assertTrue(metricsUpdater.jobMetrics.isEmpty()); } + @Test + public void testUpdatePartitionedTableMetricsOnly() { + RecordingMetricsUpdater metricsUpdater = new RecordingMetricsUpdater(); + + IcebergUpdateStatsJob.updateStatistics( + spark, + null, + metricsUpdater, + IcebergUpdateStatsJob.UpdateMode.METRICS, + catalogName, + "db.partitioned", + 100_000L); + + assertEquals(16, metricsUpdater.tableMetrics.size()); + assertTrue( + metricsUpdater.tableMetrics.stream() + .allMatch(metric -> metric.scope() == DataScope.Type.PARTITION)); + assertTrue(metricsUpdater.jobMetrics.isEmpty()); + } + + @Test + @Tag("gravitino-docker-test") + public void testUpdatePartitionedTableMetricsStoredInMySql() throws Exception { + try (MySQLContainer<?> mysql = new MySQLContainer<>("mysql:8.0.33")) { + mysql.start(); + + Map<String, String> conf = new HashMap<>(); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcUrl", mysql.getJdbcUrl()); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcUser", mysql.getUsername()); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcPassword", mysql.getPassword()); + conf.put("gravitino.optimizer.jdbcMetrics.jdbcDriver", mysql.getDriverClassName()); + initializeMySqlMetricsSchema(mysql); + + GravitinoMetricsUpdater metricsUpdater = new GravitinoMetricsUpdater(); + metricsUpdater.initialize(new OptimizerEnv(new OptimizerConfig(conf))); + try { + IcebergUpdateStatsJob.updateStatistics( + spark, + null, + metricsUpdater, + IcebergUpdateStatsJob.UpdateMode.METRICS, + catalogName, + "db.partitioned", + 100_000L); + } finally { + metricsUpdater.close(); + } + + GenericJdbcMetricsRepository repository = new GenericJdbcMetricsRepository(); + repository.initialize(conf); + try { + NameIdentifier identifier = NameIdentifier.of(catalogName, "db", "partitioned"); + long now = Instant.now().getEpochSecond(); + + PartitionPath partitionPath = + PartitionPath.of(List.of(new PartitionEntryImpl("ds", "2026-01-01"))); + List<MetricPoint> partitionMetrics = + repository.getMetrics( + DataScope.forPartition(identifier, partitionPath), now - 300, now + 300); + assertEquals(8, partitionMetrics.size()); + } finally { + repository.close(); + } + } + } + + private static void initializeMySqlMetricsSchema(MySQLContainer<?> mysql) throws Exception { + String createTableMetrics = + "CREATE TABLE IF NOT EXISTS `table_metrics` (" + + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT," + + "`table_identifier` VARCHAR(1024) NOT NULL," + + "`metric_name` VARCHAR(1024) NOT NULL," + + "`table_partition` VARCHAR(1024) DEFAULT NULL," + + "`metric_ts` BIGINT(20) NOT NULL," + + "`metric_value` VARCHAR(1024) NOT NULL," + + "PRIMARY KEY (`id`)," + + "KEY `idx_table_metrics_metric_ts` (`metric_ts`)," + + "KEY `idx_table_metrics_composite` (`table_identifier`(255), `table_partition`(255), `metric_ts`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"; + String createJobMetrics = + "CREATE TABLE IF NOT EXISTS `job_metrics` (" + + "`id` BIGINT(20) UNSIGNED NOT NULL AUTO_INCREMENT," + + "`job_identifier` VARCHAR(1024) NOT NULL," + + "`metric_name` VARCHAR(1024) NOT NULL," + + "`metric_ts` BIGINT(20) NOT NULL," + + "`metric_value` VARCHAR(1024) NOT NULL," + + "PRIMARY KEY (`id`)," + + "KEY `idx_job_metrics_metric_ts` (`metric_ts`)," + + "KEY `idx_job_metrics_identifier_metric_ts` (`job_identifier`(255), `metric_ts`)" + + ") ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin"; + try (Connection connection = + DriverManager.getConnection( + mysql.getJdbcUrl(), mysql.getUsername(), mysql.getPassword()); + Statement statement = connection.createStatement()) { + statement.execute(createTableMetrics); + statement.execute(createJobMetrics); + } + } + @Test public void testUpdateMultiLevelPartitionedTableStatistics() { RecordingStatisticsUpdater updater = new RecordingStatisticsUpdater(); IcebergUpdateStatsJob.updateStatistics( - spark, updater, catalogName, "db.multi_partitioned", 100_000L); + spark, + updater, + null, + IcebergUpdateStatsJob.UpdateMode.STATS, + catalogName, + "db.multi_partitioned", + 100_000L); assertEquals( NameIdentifier.of(catalogName, "db", "multi_partitioned"), updater.tableIdentifier); @@ -215,6 +369,152 @@ public class TestIcebergUpdateStatsJobWithSpark { parsedPartitions); } + // Requires a running deploy-mode Gravitino server and Spark environment. + @Test + @EnabledIfEnvironmentVariable(named = "GRAVITINO_ENV_IT", matches = "true") + public void testRunBuiltInUpdateStatsJobViaServer() throws Exception { + String tableName = "jobs_it_update_stats_" + UUID.randomUUID().toString().replace("-", ""); + String fullTableName = SPARK_CATALOG_NAME + ".db." + tableName; + + try (SparkSession restSpark = createRestSparkSession(); + GravitinoAdminClient adminClient = GravitinoAdminClient.builder(SERVER_URI).build()) { + GravitinoMetalake metalake = loadOrCreateMetalake(adminClient, METALAKE_NAME); + recreateRestCatalog(metalake); + createTableAndInsertData(restSpark, fullTableName); + + submitJob(metalake, buildUpdateStatsJobConfig(tableName)); + + try (GravitinoClient client = + GravitinoClient.builder(SERVER_URI).withMetalake(METALAKE_NAME).build()) { + Table table = + client + .loadCatalog(SPARK_CATALOG_NAME) + .asTableCatalog() + .loadTable(NameIdentifier.of("db", tableName)); + List<Statistic> statistics = table.supportsStatistics().listStatistics(); + Map<String, Statistic> statisticMap = + statistics.stream().collect(Collectors.toMap(Statistic::name, s -> s)); + assertTrue(statisticMap.containsKey("custom-file_count")); + assertTrue(statisticMap.containsKey("custom-datafile_mse")); + assertTrue(statisticMap.containsKey("custom-total_size")); + } + } + } + + private static SparkSession createRestSparkSession() { + return SparkSession.builder() + .master("local[2]") + .appName("jobs-iceberg-update-stats-it") + .config( + "spark.sql.extensions", + "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME, "org.apache.iceberg.spark.SparkCatalog") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".type", "rest") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".cache-enabled", "false") + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".uri", ICEBERG_REST_URI) + .config("spark.sql.catalog." + SPARK_CATALOG_NAME + ".warehouse", "") + .getOrCreate(); + } + + private static void createTableAndInsertData(SparkSession sparkSession, String fullTableName) { + sparkSession.sql("CREATE NAMESPACE IF NOT EXISTS " + SPARK_CATALOG_NAME + ".db"); + sparkSession.sql("DROP TABLE IF EXISTS " + fullTableName); + sparkSession.sql("CREATE TABLE " + fullTableName + " (id INT, data STRING) USING iceberg"); + sparkSession.sql( + "ALTER TABLE " + + fullTableName + + " SET TBLPROPERTIES ('write.target-file-size-bytes'='1024000')"); + for (int i = 0; i < 10; i++) { + sparkSession.sql("INSERT INTO " + fullTableName + " VALUES (" + i + ", 'value_" + i + "')"); + } + } + + private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { + Map<String, String> jobConf = new HashMap<>(); + jobConf.put("table_identifier", "db." + tableName); + jobConf.put("update_mode", "all"); + jobConf.put("target_file_size_bytes", "100000"); + jobConf.put( + "updater_options", + "{\"gravitino_uri\":\"" + + SERVER_URI + + "\",\"metalake\":\"" + + METALAKE_NAME + + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}"); + jobConf.put("catalog_name", SPARK_CATALOG_NAME); + jobConf.put( + "spark_conf", + "{\"spark.master\":\"local[2]\"," + + "\"spark.executor.instances\":\"1\"," + + "\"spark.executor.cores\":\"1\"," + + "\"spark.executor.memory\":\"1g\"," + + "\"spark.driver.memory\":\"1g\"," + + "\"spark.hadoop.fs.defaultFS\":\"file:///\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + "\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + ".type\":\"rest\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + ".uri\":\"" + + ICEBERG_REST_URI + + "\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + ".warehouse\":\"\"," + + "\"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}"); + return jobConf; + } + + private static void submitJob(GravitinoMetalake metalake, Map<String, String> jobConf) { + JobHandle jobHandle = metalake.runJob(JOB_TEMPLATE_NAME, jobConf); + assertTrue(StringUtils.isNotBlank(jobHandle.jobId()), "Job id should not be blank"); + + Awaitility.await() + .atMost(Duration.ofMinutes(5)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> { + JobHandle.Status status = metalake.getJob(jobHandle.jobId()).jobStatus(); + return status == JobHandle.Status.SUCCEEDED + || status == JobHandle.Status.FAILED + || status == JobHandle.Status.CANCELLED; + }); + + JobHandle.Status finalStatus = metalake.getJob(jobHandle.jobId()).jobStatus(); + assertEquals(JobHandle.Status.SUCCEEDED, finalStatus, "Job should succeed"); + } + + private static GravitinoMetalake loadOrCreateMetalake( + GravitinoAdminClient client, String metalakeName) { + try { + return client.loadMetalake(metalakeName); + } catch (NoSuchMetalakeException ignored) { + return client.createMetalake(metalakeName, "IT metalake", Map.of()); + } + } + + private static void recreateRestCatalog(GravitinoMetalake metalake) { + try { + metalake.dropCatalog(SPARK_CATALOG_NAME, true); + } catch (Exception ignored) { + // Ignore when the catalog does not exist, or when force-drop is not needed. + } + + Map<String, String> properties = new HashMap<>(); + properties.put("catalog-backend", "REST"); + properties.put("uri", ICEBERG_REST_URI); + + metalake.createCatalog( + SPARK_CATALOG_NAME, + Catalog.Type.RELATIONAL, + "lakehouse-iceberg", + "IT Iceberg REST catalog", + properties); + } + private static final class RecordingStatisticsUpdater implements StatisticsUpdater { private NameIdentifier tableIdentifier; private List<StatisticEntry<?>> tableStatistics = List.of(); diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java index bc6285c41b..6514ac755e 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergRewriteDataFiles.java @@ -309,10 +309,15 @@ public class TestBuiltinIcebergRewriteDataFiles { private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { Map<String, String> jobConf = new HashMap<>(); jobConf.put("table_identifier", "db." + tableName); - jobConf.put("gravitino_uri", SERVER_URI); - jobConf.put("metalake", METALAKE_NAME); + jobConf.put("update_mode", "all"); jobConf.put("target_file_size_bytes", "100000"); - jobConf.put("statistics_updater", "gravitino-statistics-updater"); + jobConf.put( + "updater_options", + "{\"gravitino_uri\":\"" + + SERVER_URI + + "\",\"metalake\":\"" + + METALAKE_NAME + + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}"); jobConf.putAll(createOptimizerConfig().jobSubmitterConfigs()); return jobConf; } diff --git a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java index 003bb16a0f..b2b8fbb4e4 100644 --- a/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java +++ b/maintenance/optimizer/src/test/java/org/apache/gravitino/maintenance/optimizer/recommender/job/TestBuiltinIcebergUpdateStatsJob.java @@ -114,20 +114,41 @@ public class TestBuiltinIcebergUpdateStatsJob { private static Map<String, String> buildUpdateStatsJobConfig(String tableName) { Map<String, String> jobConf = new HashMap<>(); jobConf.put("table_identifier", "db." + tableName); - jobConf.put("gravitino_uri", SERVER_URI); - jobConf.put("metalake", METALAKE_NAME); + jobConf.put("update_mode", "all"); jobConf.put("target_file_size_bytes", "100000"); - jobConf.put("statistics_updater", "gravitino-statistics-updater"); + jobConf.put( + "updater_options", + "{\"gravitino_uri\":\"" + + SERVER_URI + + "\",\"metalake\":\"" + + METALAKE_NAME + + "\",\"statistics_updater\":\"gravitino-statistics-updater\"}"); jobConf.put("catalog_name", SPARK_CATALOG_NAME); - jobConf.put("catalog_type", "rest"); - jobConf.put("catalog_uri", ICEBERG_REST_URI); - jobConf.put("warehouse_location", WAREHOUSE_LOCATION); - jobConf.put("spark_master", "local[2]"); - jobConf.put("spark_executor_instances", "1"); - jobConf.put("spark_executor_cores", "1"); - jobConf.put("spark_executor_memory", "1g"); - jobConf.put("spark_driver_memory", "1g"); - jobConf.put("spark_conf", "{\"spark.hadoop.fs.defaultFS\":\"file:///\"}"); + jobConf.put( + "spark_conf", + "{\"spark.master\":\"local[2]\"," + + "\"spark.executor.instances\":\"1\"," + + "\"spark.executor.cores\":\"1\"," + + "\"spark.executor.memory\":\"1g\"," + + "\"spark.driver.memory\":\"1g\"," + + "\"spark.hadoop.fs.defaultFS\":\"file:///\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + "\":\"org.apache.iceberg.spark.SparkCatalog\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + ".type\":\"rest\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + ".uri\":\"" + + ICEBERG_REST_URI + + "\"," + + "\"spark.sql.catalog." + + SPARK_CATALOG_NAME + + ".warehouse\":\"" + + WAREHOUSE_LOCATION + + "\"," + + "\"spark.sql.extensions\":\"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"}"); return jobConf; }
