This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 02a0aeeeac [#9543] feat(jobs): Add the built-in Iceberg rewrite data
files job template to Gravitino (#9588)
02a0aeeeac is described below
commit 02a0aeeeac902d02ceedbd099d08962ba55d2a22
Author: Jerry Shao <[email protected]>
AuthorDate: Wed Feb 4 14:46:16 2026 +0800
[#9543] feat(jobs): Add the built-in Iceberg rewrite data files job
template to Gravitino (#9588)
This commit implements a built-in job template for rewriting Iceberg
table data files, which supports binpack, sort strategies for table
optimization.
Key Features:
- Named argument parser supporting flexible parameter combinations
- Calls Iceberg's native rewrite_data_files stored procedure
- Supports all rewrite strategies: binpack, sort
- Configurable options for file sizes, thresholds, and behavior
- Template-based configuration for Spark and Iceberg catalogs
- Handles both Iceberg 1.6.1 (4 columns) and newer versions (5 columns)
Implementation:
- IcebergRewriteDataFilesJob.java (335 lines)
- Template name: builtin-iceberg-rewrite-data-files
- Version: v1
- Arguments: --catalog, --table, --strategy, --sort-order, --where,
--options
- Spark configs for runtime and Iceberg catalog setup
- BuiltInJobTemplateProvider.java (modified)
- Registered new IcebergRewriteDataFilesJob
- build.gradle.kts (modified)
- Added Iceberg Spark runtime dependency (1.6.1)
- Added Spark, Scala, and Hadoop test dependencies
Tests (41 tests, all passing):
- TestIcebergRewriteDataFilesJob.java (33 tests, 429 lines)
- Template structure validation
- Argument parsing (required, optional, empty values, order-independent)
- JSON options parsing (single, multiple, boolean, empty)
- SQL generation (minimal, with strategy, sort, where, options, all
params)
- TestIcebergRewriteDataFilesJobWithSpark.java (8 tests, 229 lines)
- Real Spark session integration tests
- Executes actual Iceberg rewrite_data_files procedures
- Validates data integrity after rewrite operations
- Tests all parameter combinations with live Iceberg catalog
Usage Examples:
```
--catalog iceberg_prod --table db.sample
--catalog iceberg_prod --table db.sample --strategy sort \
--sort-order 'id DESC NULLS LAST'
--catalog iceberg_prod --table db.sample --strategy sort \
--sort-order 'zorder(user_id, event_type, timestamp)'
--catalog iceberg_prod --table db.sample --where 'year = 2024' \
--options '{"min-input-files":"2","remove-dangling-deletes":"true"}'
```
Fix: #9543
---
maintenance/jobs/build.gradle.kts | 33 +-
.../jobs/BuiltInJobTemplateProvider.java | 4 +-
.../jobs/iceberg/IcebergRewriteDataFilesJob.java | 522 +++++++++++++++
.../iceberg/TestIcebergRewriteDataFilesJob.java | 711 +++++++++++++++++++++
.../TestIcebergRewriteDataFilesJobWithSpark.java | 435 +++++++++++++
5 files changed, 1703 insertions(+), 2 deletions(-)
diff --git a/maintenance/jobs/build.gradle.kts
b/maintenance/jobs/build.gradle.kts
index 9a0266f7d7..1cec173984 100644
--- a/maintenance/jobs/build.gradle.kts
+++ b/maintenance/jobs/build.gradle.kts
@@ -31,19 +31,44 @@ repositories {
val scalaVersion: String = project.properties["scalaVersion"] as? String ?:
extra["defaultScalaVersion"].toString()
val sparkVersion: String = libs.versions.spark35.get()
+val icebergVersion: String = libs.versions.iceberg4connector.get()
+val sparkMajorVersion = "3.5"
dependencies {
- compileOnly(project(":api"))
+ implementation(project(":api"))
compileOnly(libs.slf4j.api)
+ compileOnly(libs.jackson.databind)
compileOnly("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion") {
exclude("org.slf4j")
exclude("org.apache.logging.log4j")
}
+ // Iceberg dependencies for rewrite data files job
+
compileOnly("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
{
+ exclude("org.slf4j")
+ exclude("org.apache.logging.log4j")
+ }
+
testImplementation(project(":api"))
testImplementation(libs.bundles.log4j)
+ testImplementation(libs.hadoop3.common) {
+ exclude("org.slf4j")
+ exclude("org.apache.logging.log4j")
+ exclude("com.sun.jersey")
+ exclude("javax.servlet")
+ }
testImplementation(libs.junit.jupiter.api)
+
testImplementation("org.apache.iceberg:iceberg-spark-runtime-${sparkMajorVersion}_$scalaVersion:$icebergVersion")
{
+ exclude("org.slf4j")
+ exclude("org.apache.logging.log4j")
+ }
+ testImplementation("org.apache.spark:spark-sql_$scalaVersion:$sparkVersion")
{
+ exclude("org.slf4j")
+ exclude("org.apache.logging.log4j")
+ }
+
testImplementation("org.scala-lang.modules:scala-collection-compat_$scalaVersion:2.7.0")
+
testRuntimeOnly(libs.junit.jupiter.engine)
}
@@ -53,11 +78,17 @@ tasks.test {
tasks.withType(ShadowJar::class.java) {
isZip64 = true
+ configurations = listOf(project.configurations.runtimeClasspath.get())
archiveClassifier.set("")
mergeServiceFiles()
dependencies {
+ relocate("com.google", "org.apache.gravitino.shaded.com.google")
+ relocate("org.apache.commons",
"org.apache.gravitino.shaded.org.apache.commons")
+ relocate("com.fasterxml", "org.apache.gravitino.shaded.com.fasterxml")
+
exclude(dependency("org.apache.spark:.*"))
+ exclude(dependency("org.apache.iceberg:.*"))
exclude(dependency("org.slf4j:slf4j-api"))
exclude(dependency("org.apache.logging.log4j:.*"))
}
diff --git
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
index 3758fe8d26..8620c2330c 100644
---
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
+++
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/BuiltInJobTemplateProvider.java
@@ -25,6 +25,7 @@ import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.gravitino.job.JobTemplate;
import org.apache.gravitino.job.JobTemplateProvider;
+import
org.apache.gravitino.maintenance.jobs.iceberg.IcebergRewriteDataFilesJob;
import org.apache.gravitino.maintenance.jobs.spark.SparkPiJob;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,7 +40,8 @@ public class BuiltInJobTemplateProvider implements
JobTemplateProvider {
private static final Pattern VERSION_PATTERN =
Pattern.compile(JobTemplateProvider.VERSION_VALUE_PATTERN);
- private static final List<BuiltInJob> BUILT_IN_JOBS = ImmutableList.of(new
SparkPiJob());
+ private static final List<BuiltInJob> BUILT_IN_JOBS =
+ ImmutableList.of(new SparkPiJob(), new IcebergRewriteDataFilesJob());
@Override
public List<? extends JobTemplate> jobTemplates() {
diff --git
a/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
new file mode 100644
index 0000000000..5324204434
--- /dev/null
+++
b/maintenance/jobs/src/main/java/org/apache/gravitino/maintenance/jobs/iceberg/IcebergRewriteDataFilesJob.java
@@ -0,0 +1,522 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.apache.gravitino.maintenance.jobs.BuiltInJob;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * Built-in job for rewriting Iceberg table data files.
+ *
+ * <p>This job leverages Iceberg's RewriteDataFilesProcedure to optimize data
file layout through
+ * binpack or sort strategies. Z-order is a type of sort order, not a strategy.
+ */
+public class IcebergRewriteDataFilesJob implements BuiltInJob {
+
+ private static final String NAME =
+ JobTemplateProvider.BUILTIN_NAME_PREFIX + "iceberg-rewrite-data-files";
+ private static final String VERSION = "v1";
+
+ // Valid strategy values for Iceberg rewrite_data_files procedure
+ private static final String STRATEGY_BINPACK = "binpack";
+ private static final String STRATEGY_SORT = "sort";
+
+ @Override
+ public SparkJobTemplate jobTemplate() {
+ return SparkJobTemplate.builder()
+ .withName(NAME)
+ .withComment("Built-in Iceberg rewrite data files job template for
table optimization")
+ .withExecutable(resolveExecutable(IcebergRewriteDataFilesJob.class))
+ .withClassName(IcebergRewriteDataFilesJob.class.getName())
+ .withArguments(buildArguments())
+ .withConfigs(buildSparkConfigs())
+ .withCustomFields(
+ Collections.singletonMap(JobTemplateProvider.PROPERTY_VERSION_KEY,
VERSION))
+ .build();
+ }
+
+ /**
+ * Main entry point for the rewrite data files job.
+ *
+ * <p>Uses named arguments for flexibility:
+ *
+ * <ul>
+ * <li>--catalog <catalog_name> Required. Iceberg catalog name.
+ * <li>--table <table_identifier> Required. Table name (db.table)
+ * <li>--strategy <strategy> Optional. binpack or sort
+ * <li>--sort-order <sort_order> Optional. Sort order specification
+ * <li>--where <where_clause> Optional. Filter predicate
+ * <li>--options <options_json> Optional. JSON map of options
+ * <li>--spark-conf <spark_conf_json> Optional. JSON map of custom
Spark configurations
+ * </ul>
+ *
+ * <p><b>Important Notes on Special Characters:</b>
+ *
+ * <ul>
+ * <li><b>Via Gravitino API:</b> Pass values as-is without shell escaping.
Example: {@code
+ * jobConf.put("options", "{\"a\":\"b\"}")} - Gravitino handles
escaping internally via
+ * ProcessBuilder.
+ * <li><b>Via Command Line:</b> Use shell quoting. Example: {@code
--options
+ * '{"min-input-files":"2"}'}
+ * <li><b>SQL Single Quotes:</b> Use as-is in where clauses. Example:
{@code --where "status =
+ * 'active'"} - Single quotes are automatically escaped for SQL.
+ * <li><b>JSON Values:</b> Must be valid JSON strings. The job parses and
validates JSON before
+ * use.
+ * </ul>
+ *
+ * <p>Example via command line: --catalog iceberg_catalog --table db.sample
--strategy binpack
+ * --options '{"min-input-files":"2"}' --spark-conf
'{"spark.sql.shuffle.partitions":"200"}'
+ *
+ * <p>Example via Gravitino API:
+ *
+ * <pre>{@code
+ * Map<String, String> jobConf = new HashMap<>();
+ * jobConf.put("catalog_name", "iceberg_catalog");
+ * jobConf.put("table_identifier", "db.sample");
+ * jobConf.put("options", "{\"min-input-files\":\"2\"}"); // No shell
escaping needed
+ * jobConf.put("where_clause", "year = 2024 and status = 'active'"); // SQL
quotes handled
+ * metalake.runJob("builtin-iceberg-rewrite-data-files", jobConf);
+ * }</pre>
+ */
+ public static void main(String[] args) {
+ if (args.length < 4) {
+ printUsage();
+ System.exit(1);
+ }
+
+ // Parse named arguments
+ Map<String, String> argMap = parseArguments(args);
+
+ // Validate required arguments
+ String catalogName = argMap.get("catalog");
+ String tableIdentifier = argMap.get("table");
+
+ if (catalogName == null || tableIdentifier == null) {
+ System.err.println("Error: --catalog and --table are required
arguments");
+ printUsage();
+ System.exit(1);
+ }
+
+ // Optional arguments
+ String strategy = argMap.get("strategy");
+ String sortOrder = argMap.get("sort-order");
+ String whereClause = argMap.get("where");
+ String optionsJson = argMap.get("options");
+ String sparkConfJson = argMap.get("spark-conf");
+
+ // Validate strategy if provided
+ try {
+ validateStrategy(strategy);
+ } catch (IllegalArgumentException e) {
+ System.err.println("Error: " + e.getMessage());
+ printUsage();
+ System.exit(1);
+ }
+
+ // Build Spark session with custom configs if provided
+ SparkSession.Builder sparkBuilder =
+ SparkSession.builder().appName("Gravitino Built-in Iceberg Rewrite
Data Files");
+
+ // Apply custom Spark configurations if provided
+ if (sparkConfJson != null && !sparkConfJson.isEmpty()) {
+ try {
+ Map<String, String> customConfigs =
parseCustomSparkConfigs(sparkConfJson);
+ for (Map.Entry<String, String> entry : customConfigs.entrySet()) {
+ sparkBuilder.config(entry.getKey(), entry.getValue());
+ }
+ System.out.println("Applied custom Spark configurations: " +
customConfigs);
+ } catch (IllegalArgumentException e) {
+ System.err.println("Error: " + e.getMessage());
+ printUsage();
+ System.exit(1);
+ }
+ }
+
+ SparkSession spark = sparkBuilder.getOrCreate();
+
+ try {
+ // Build the procedure call SQL
+ String sql =
+ buildProcedureCall(
+ catalogName, tableIdentifier, strategy, sortOrder, whereClause,
optionsJson);
+
+ System.out.println("Executing Iceberg rewrite_data_files procedure: " +
sql);
+
+ // Execute the procedure
+ Row[] results = (Row[]) spark.sql(sql).collect();
+
+ // Print results
+ if (results.length > 0) {
+ Row result = results[0];
+ // Iceberg 1.6.1 returns 4 columns, newer versions may return 5
+ if (result.size() >= 5) {
+ System.out.printf(
+ "Rewrite Data Files Results:%n"
+ + " Rewritten data files: %d%n"
+ + " Added data files: %d%n"
+ + " Rewritten bytes: %d%n"
+ + " Failed data files: %d%n"
+ + " Removed delete files: %d%n",
+ result.getInt(0),
+ result.getInt(1),
+ result.getLong(2),
+ result.getInt(3),
+ result.getInt(4));
+ } else {
+ System.out.printf(
+ "Rewrite Data Files Results:%n"
+ + " Rewritten data files: %d%n"
+ + " Added data files: %d%n"
+ + " Rewritten bytes: %d%n"
+ + " Failed data files: %d%n",
+ result.getInt(0), result.getInt(1), result.getLong(2),
result.getInt(3));
+ }
+ }
+
+ System.out.println("Rewrite data files job completed successfully");
+ } catch (Exception e) {
+ System.err.println("Error executing rewrite data files job: " +
e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ } finally {
+ spark.stop();
+ }
+ }
+
+ /**
+ * Build the SQL CALL statement for the rewrite_data_files procedure.
+ *
+ * @param catalogName Iceberg catalog name
+ * @param tableIdentifier Fully qualified table name
+ * @param strategy Rewrite strategy (binpack or sort)
+ * @param sortOrder Sort order specification
+ * @param whereClause Filter predicate
+ * @param optionsJson JSON map of options
+ * @return SQL CALL statement
+ */
+ static String buildProcedureCall(
+ String catalogName,
+ String tableIdentifier,
+ String strategy,
+ String sortOrder,
+ String whereClause,
+ String optionsJson) {
+
+ StringBuilder sql = new StringBuilder();
+ sql.append("CALL ")
+ .append(escapeSqlIdentifier(catalogName))
+ .append(".system.rewrite_data_files(");
+ sql.append("table =>
'").append(escapeSqlString(tableIdentifier)).append("'");
+
+ if (strategy != null && !strategy.isEmpty()) {
+ sql.append(", strategy =>
'").append(escapeSqlString(strategy)).append("'");
+ }
+
+ if (sortOrder != null && !sortOrder.isEmpty()) {
+ sql.append(", sort_order =>
'").append(escapeSqlString(sortOrder)).append("'");
+ }
+
+ if (whereClause != null && !whereClause.isEmpty()) {
+ sql.append(", where =>
'").append(escapeSqlString(whereClause)).append("'");
+ }
+
+ if (optionsJson != null && !optionsJson.isEmpty()) {
+ // Parse JSON and convert to map syntax for Iceberg procedure
+ Map<String, String> options = parseOptionsJson(optionsJson);
+ if (!options.isEmpty()) {
+ sql.append(", options => map(");
+ boolean first = true;
+ for (Map.Entry<String, String> entry : options.entrySet()) {
+ if (!first) {
+ sql.append(", ");
+ }
+ sql.append("'")
+ .append(escapeSqlString(entry.getKey()))
+ .append("', '")
+ .append(escapeSqlString(entry.getValue()))
+ .append("'");
+ first = false;
+ }
+ sql.append(")");
+ }
+ }
+
+ sql.append(")");
+ return sql.toString();
+ }
+
+ /**
+ * Escape single quotes in SQL string literals by replacing ' with ''.
+ *
+ * @param value the string value to escape
+ * @return escaped string safe for use in SQL string literals
+ */
+ static String escapeSqlString(String value) {
+ if (value == null) {
+ return null;
+ }
+ return value.replace("'", "''");
+ }
+
+ /**
+ * Escape SQL identifiers by replacing backticks and validating format.
+ *
+ * @param identifier the SQL identifier to escape
+ * @return escaped identifier safe for use in SQL
+ */
+ static String escapeSqlIdentifier(String identifier) {
+ if (identifier == null) {
+ return null;
+ }
+ // Replace backticks to prevent breaking out of identifier quotes
+ return identifier.replace("`", "``");
+ }
+
+ /**
+ * Parse command line arguments in --key value format.
+ *
+ * @param args command line arguments
+ * @return map of argument names to values
+ */
+ static Map<String, String> parseArguments(String[] args) {
+ Map<String, String> argMap = new HashMap<>();
+
+ for (int i = 0; i < args.length; i++) {
+ if (args[i].startsWith("--")) {
+ String key = args[i].substring(2); // Remove "--" prefix
+
+ // Check if there's a value for this key
+ if (i + 1 < args.length && !args[i + 1].startsWith("--")) {
+ String value = args[i + 1];
+ // Only add non-empty values
+ if (value != null && !value.trim().isEmpty()) {
+ argMap.put(key, value);
+ }
+ i++; // Skip the value in next iteration
+ } else {
+ System.err.println("Warning: Flag " + args[i] + " has no value,
ignoring");
+ }
+ }
+ }
+
+ return argMap;
+ }
+
+ /**
+ * Validate the strategy parameter value.
+ *
+ * @param strategy the strategy value to validate
+ * @throws IllegalArgumentException if the strategy is invalid
+ */
+ static void validateStrategy(String strategy) {
+ if (strategy == null || strategy.isEmpty()) {
+ return; // Strategy is optional
+ }
+
+ if (!STRATEGY_BINPACK.equals(strategy) && !STRATEGY_SORT.equals(strategy))
{
+ throw new IllegalArgumentException(
+ "Invalid strategy '"
+ + strategy
+ + "'. Valid values are: '"
+ + STRATEGY_BINPACK
+ + "', '"
+ + STRATEGY_SORT
+ + "'");
+ }
+ }
+
+ /**
+ * Parse custom Spark configurations from JSON string.
+ *
+ * @param sparkConfJson JSON string containing Spark configurations
+ * @return map of Spark configuration keys to values
+ * @throws IllegalArgumentException if JSON parsing fails
+ */
+ static Map<String, String> parseCustomSparkConfigs(String sparkConfJson) {
+ if (sparkConfJson == null || sparkConfJson.isEmpty()) {
+ return new HashMap<>();
+ }
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ Map<String, Object> parsedMap =
+ mapper.readValue(sparkConfJson, new TypeReference<Map<String,
Object>>() {});
+
+ Map<String, String> configs = new HashMap<>();
+ for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ configs.put(key, value == null ? "" : value.toString());
+ }
+ return configs;
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Failed to parse Spark configurations JSON: "
+ + sparkConfJson
+ + ". Error: "
+ + e.getMessage(),
+ e);
+ }
+ }
+
+ /** Print usage information. */
+ private static void printUsage() {
+ System.err.println(
+ "Usage: IcebergRewriteDataFilesJob [OPTIONS]\n"
+ + "\n"
+ + "Required Options:\n"
+ + " --catalog <name> Iceberg catalog name registered in
Spark\n"
+ + " --table <identifier> Fully qualified table name (e.g.,
db.table_name)\n"
+ + "\n"
+ + "Optional Options:\n"
+ + " --strategy <name> Rewrite strategy: binpack (default)
or sort\n"
+ + " --sort-order <spec> Sort order specification:\n"
+ + " For columns: 'id DESC NULLS LAST,
name ASC'\n"
+ + " For Z-Order: 'zorder(c1,c2,c3)'\n"
+ + " --where <predicate> Filter predicate to select files\n"
+ + " Example: 'year = 2024 and status
= ''active'''\n"
+ + " --options <json> JSON map of Iceberg rewrite
options\n"
+ + " Example:
'{\"min-input-files\":\"2\"}'\n"
+ + " --spark-conf <json> JSON map of custom Spark
configurations\n"
+ + " Example:
'{\"spark.sql.shuffle.partitions\":\"200\"}'\n"
+ + " Note: Cannot override catalog,
extensions, or app name configs\n"
+ + "\n"
+ + "Examples:\n"
+ + " # Basic binpack\n"
+ + " --catalog iceberg_prod --table db.sample\n"
+ + "\n"
+ + " # Sort by columns\n"
+ + " --catalog iceberg_prod --table db.sample --strategy sort \\\n"
+ + " --sort-order 'id DESC NULLS LAST'\n"
+ + "\n"
+ + " # With filter and options\n"
+ + " --catalog iceberg_prod --table db.sample --where 'year = 2024
and status = ''active''' \\\n"
+ + " --options
'{\"min-input-files\":\"2\",\"remove-dangling-deletes\":\"true\"}'\n"
+ + "\n"
+ + " # With custom Spark configurations\n"
+ + " --catalog iceberg_prod --table db.sample --strategy binpack
\\\n"
+ + " --spark-conf
'{\"spark.sql.shuffle.partitions\":\"200\",\"spark.executor.memory\":\"4g\"}'");
+ }
+
+ /**
+ * Parse options from JSON string using Jackson for robust parsing.
+ *
+ * <p>Expected format: {"key1": "value1", "key2": "value2"}
+ *
+ * <p>This method uses Jackson ObjectMapper to properly handle:
+ *
+ * <ul>
+ * <li>Escaped quotes in values
+ * <li>Colons and commas in values
+ * <li>Complex JSON structures
+ * <li>Various data types (strings, numbers, booleans)
+ * </ul>
+ *
+ * @param optionsJson JSON string
+ * @return map of option keys to values
+ */
+ static Map<String, String> parseOptionsJson(String optionsJson) {
+ Map<String, String> options = new HashMap<>();
+ if (optionsJson == null || optionsJson.isEmpty()) {
+ return options;
+ }
+
+ try {
+ ObjectMapper mapper = new ObjectMapper();
+ // Parse JSON into a Map<String, Object> to handle various value types
+ Map<String, Object> parsedMap =
+ mapper.readValue(optionsJson, new TypeReference<Map<String,
Object>>() {});
+
+ // Convert all values to strings
+ for (Map.Entry<String, Object> entry : parsedMap.entrySet()) {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ // Convert value to string - handles strings, numbers, booleans, etc.
+ options.put(key, value == null ? "" : value.toString());
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(
+ "Failed to parse options JSON: " + optionsJson + ". Error: " +
e.getMessage(), e);
+ }
+
+ return options;
+ }
+
+ /**
+ * Build template arguments list with named argument format.
+ *
+ * @return list of template arguments
+ */
+ private static List<String> buildArguments() {
+ return Arrays.asList(
+ "--catalog",
+ "{{catalog_name}}",
+ "--table",
+ "{{table_identifier}}",
+ "--strategy",
+ "{{strategy}}",
+ "--sort-order",
+ "{{sort_order}}",
+ "--where",
+ "{{where_clause}}",
+ "--options",
+ "{{options}}",
+ "--spark-conf",
+ "{{spark_conf}}");
+ }
+
+ /**
+ * Build Spark configuration template.
+ *
+ * @return map of Spark configuration keys to template values
+ */
+ private static Map<String, String> buildSparkConfigs() {
+ Map<String, String> configs = new HashMap<>();
+
+ // Spark runtime configs
+ configs.put("spark.master", "{{spark_master}}");
+ configs.put("spark.executor.instances", "{{spark_executor_instances}}");
+ configs.put("spark.executor.cores", "{{spark_executor_cores}}");
+ configs.put("spark.executor.memory", "{{spark_executor_memory}}");
+ configs.put("spark.driver.memory", "{{spark_driver_memory}}");
+
+ // Iceberg catalog configuration
+ configs.put("spark.sql.catalog.{{catalog_name}}",
"org.apache.iceberg.spark.SparkCatalog");
+ configs.put("spark.sql.catalog.{{catalog_name}}.type", "{{catalog_type}}");
+ configs.put("spark.sql.catalog.{{catalog_name}}.uri", "{{catalog_uri}}");
+ configs.put("spark.sql.catalog.{{catalog_name}}.warehouse",
"{{warehouse_location}}");
+
+ // Iceberg extensions
+ configs.put(
+ "spark.sql.extensions",
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions");
+
+ return Collections.unmodifiableMap(configs);
+ }
+}
diff --git
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergRewriteDataFilesJob.java
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergRewriteDataFilesJob.java
new file mode 100644
index 0000000000..51bb2ba108
--- /dev/null
+++
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergRewriteDataFilesJob.java
@@ -0,0 +1,711 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.Map;
+import org.apache.gravitino.job.JobTemplateProvider;
+import org.apache.gravitino.job.SparkJobTemplate;
+import org.junit.jupiter.api.Test;
+
+public class TestIcebergRewriteDataFilesJob {
+
+ @Test
+ public void testJobTemplateHasCorrectName() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template);
+ assertEquals("builtin-iceberg-rewrite-data-files", template.name());
+ }
+
+ @Test
+ public void testJobTemplateHasComment() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.comment());
+ assertFalse(template.comment().trim().isEmpty());
+ assertTrue(template.comment().contains("Iceberg"));
+ }
+
+ @Test
+ public void testJobTemplateHasExecutable() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.executable());
+ assertFalse(template.executable().trim().isEmpty());
+ }
+
+ @Test
+ public void testJobTemplateHasMainClass() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.className());
+ assertEquals(IcebergRewriteDataFilesJob.class.getName(),
template.className());
+ }
+
+ @Test
+ public void testJobTemplateHasArguments() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ assertNotNull(template.arguments());
+ assertEquals(14, template.arguments().size()); // 7 flags * 2 (flag +
value)
+
+ // Verify all expected arguments are present
+ assertTrue(template.arguments().contains("--catalog"));
+ assertTrue(template.arguments().contains("{{catalog_name}}"));
+ assertTrue(template.arguments().contains("--table"));
+ assertTrue(template.arguments().contains("{{table_identifier}}"));
+ assertTrue(template.arguments().contains("--strategy"));
+ assertTrue(template.arguments().contains("{{strategy}}"));
+ assertTrue(template.arguments().contains("--sort-order"));
+ assertTrue(template.arguments().contains("{{sort_order}}"));
+ assertTrue(template.arguments().contains("--where"));
+ assertTrue(template.arguments().contains("{{where_clause}}"));
+ assertTrue(template.arguments().contains("--options"));
+ assertTrue(template.arguments().contains("{{options}}"));
+ assertTrue(template.arguments().contains("--spark-conf"));
+ assertTrue(template.arguments().contains("{{spark_conf}}"));
+ }
+
+ @Test
+ public void testJobTemplateHasSparkConfigs() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ Map<String, String> configs = template.configs();
+ assertNotNull(configs);
+ assertFalse(configs.isEmpty());
+
+ // Verify Spark runtime configs
+ assertTrue(configs.containsKey("spark.master"));
+ assertTrue(configs.containsKey("spark.executor.instances"));
+ assertTrue(configs.containsKey("spark.executor.cores"));
+ assertTrue(configs.containsKey("spark.executor.memory"));
+ assertTrue(configs.containsKey("spark.driver.memory"));
+
+ // Verify Iceberg catalog configs
+ assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}"));
+ assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.type"));
+ assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.uri"));
+
assertTrue(configs.containsKey("spark.sql.catalog.{{catalog_name}}.warehouse"));
+
+ // Verify Iceberg extensions
+ assertTrue(configs.containsKey("spark.sql.extensions"));
+ assertEquals(
+ "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
+ configs.get("spark.sql.extensions"));
+ }
+
+ @Test
+ public void testJobTemplateHasVersion() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+ Map<String, String> customFields = template.customFields();
+ assertNotNull(customFields);
+
assertTrue(customFields.containsKey(JobTemplateProvider.PROPERTY_VERSION_KEY));
+
+ String version =
customFields.get(JobTemplateProvider.PROPERTY_VERSION_KEY);
+ assertEquals("v1", version);
+ assertTrue(version.matches(JobTemplateProvider.VERSION_VALUE_PATTERN));
+ }
+
+ @Test
+ public void testJobTemplateNameMatchesBuiltInPattern() {
+ IcebergRewriteDataFilesJob job = new IcebergRewriteDataFilesJob();
+ SparkJobTemplate template = job.jobTemplate();
+
+
assertTrue(template.name().matches(JobTemplateProvider.BUILTIN_NAME_PATTERN));
+
assertTrue(template.name().startsWith(JobTemplateProvider.BUILTIN_NAME_PREFIX));
+ }
+
+ // Test parseArguments method
+
+ @Test
+ public void testParseArgumentsWithAllRequired() {
+ String[] args = {"--catalog", "iceberg_prod", "--table", "db.sample"};
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseArguments(args);
+
+ assertEquals(2, result.size());
+ assertEquals("iceberg_prod", result.get("catalog"));
+ assertEquals("db.sample", result.get("table"));
+ }
+
+ @Test
+ public void testParseArgumentsWithOptional() {
+ String[] args = {
+ "--catalog", "iceberg_prod",
+ "--table", "db.sample",
+ "--strategy", "binpack",
+ "--where", "year = 2024"
+ };
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseArguments(args);
+
+ assertEquals(4, result.size());
+ assertEquals("iceberg_prod", result.get("catalog"));
+ assertEquals("db.sample", result.get("table"));
+ assertEquals("binpack", result.get("strategy"));
+ assertEquals("year = 2024", result.get("where"));
+ }
+
+ @Test
+ public void testParseArgumentsWithEmptyValues() {
+ String[] args = {"--catalog", "iceberg_prod", "--table", "db.sample",
"--strategy", ""};
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseArguments(args);
+
+ // Empty values should be ignored
+ assertEquals(2, result.size());
+ assertEquals("iceberg_prod", result.get("catalog"));
+ assertEquals("db.sample", result.get("table"));
+ assertFalse(result.containsKey("strategy"));
+ }
+
+ @Test
+ public void testParseArgumentsWithMissingValues() {
+ String[] args = {"--catalog", "iceberg_prod", "--table"};
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseArguments(args);
+
+ // Only catalog should be parsed, table has no value
+ assertEquals(1, result.size());
+ assertEquals("iceberg_prod", result.get("catalog"));
+ assertFalse(result.containsKey("table"));
+ }
+
+ @Test
+ public void testParseArgumentsWithComplexValues() {
+ String[] args = {
+ "--catalog", "iceberg_prod",
+ "--table", "db.sample",
+ "--where", "year = 2024 and month = 1",
+ "--options", "{\"min-input-files\":\"2\"}"
+ };
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseArguments(args);
+
+ assertEquals(4, result.size());
+ assertEquals("year = 2024 and month = 1", result.get("where"));
+ assertEquals("{\"min-input-files\":\"2\"}", result.get("options"));
+ }
+
+ @Test
+ public void testParseArgumentsWithSortOrder() {
+ String[] args = {
+ "--catalog", "iceberg_prod",
+ "--table", "db.sample",
+ "--strategy", "sort",
+ "--sort-order", "id DESC NULLS LAST, name ASC"
+ };
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseArguments(args);
+
+ assertEquals(4, result.size());
+ assertEquals("sort", result.get("strategy"));
+ assertEquals("id DESC NULLS LAST, name ASC", result.get("sort-order"));
+ }
+
+ @Test
+ public void testParseArgumentsOrderIndependent() {
+ String[] args1 = {"--catalog", "cat1", "--table", "tbl1", "--strategy",
"binpack"};
+ String[] args2 = {"--strategy", "binpack", "--table", "tbl1", "--catalog",
"cat1"};
+
+ Map<String, String> result1 =
IcebergRewriteDataFilesJob.parseArguments(args1);
+ Map<String, String> result2 =
IcebergRewriteDataFilesJob.parseArguments(args2);
+
+ assertEquals(result1, result2);
+ }
+
+ // Test parseOptionsJson method
+
+ @Test
+ public void testParseOptionsJsonWithSingleOption() {
+ String json = "{\"min-input-files\":\"2\"}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(1, result.size());
+ assertEquals("2", result.get("min-input-files"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithMultipleOptions() {
+ String json =
"{\"min-input-files\":\"2\",\"target-file-size-bytes\":\"536870912\"}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(2, result.size());
+ assertEquals("2", result.get("min-input-files"));
+ assertEquals("536870912", result.get("target-file-size-bytes"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithBooleanValues() {
+ String json =
+
"{\"rewrite-all\":\"true\",\"partial-progress.enabled\":\"true\",\"remove-dangling-deletes\":\"false\"}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(3, result.size());
+ assertEquals("true", result.get("rewrite-all"));
+ assertEquals("true", result.get("partial-progress.enabled"));
+ assertEquals("false", result.get("remove-dangling-deletes"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithEmptyString() {
+ String json = "";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testParseOptionsJsonWithNull() {
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(null);
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testParseOptionsJsonWithEmptyObject() {
+ String json = "{}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ public void testParseOptionsJsonWithColonsInValue() {
+ // Test handling of colons in values (e.g., file paths)
+ String json =
"{\"path\":\"/data/file:backup\",\"url\":\"http://example.com:8080\"}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(2, result.size());
+ assertEquals("/data/file:backup", result.get("path"));
+ assertEquals("http://example.com:8080", result.get("url"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithCommasInValue() {
+ // Test handling of commas in values
+ String json = "{\"list\":\"item1,item2,item3\",\"description\":\"a,b,c\"}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(2, result.size());
+ assertEquals("item1,item2,item3", result.get("list"));
+ assertEquals("a,b,c", result.get("description"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithEscapedQuotes() {
+ // Test handling of escaped quotes in values
+ String json = "{\"message\":\"He said
\\\"hello\\\"\",\"name\":\"O'Brien\"}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(2, result.size());
+ assertEquals("He said \"hello\"", result.get("message"));
+ assertEquals("O'Brien", result.get("name"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithNumericValues() {
+ // Test handling of numeric values (should be converted to strings)
+ String json = "{\"max-size\":1073741824,\"min-files\":2,\"ratio\":0.75}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(3, result.size());
+ assertEquals("1073741824", result.get("max-size"));
+ assertEquals("2", result.get("min-files"));
+ assertEquals("0.75", result.get("ratio"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithBooleanTypes() {
+ // Test handling of boolean values (not strings)
+ String json = "{\"enabled\":true,\"disabled\":false}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(2, result.size());
+ assertEquals("true", result.get("enabled"));
+ assertEquals("false", result.get("disabled"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithComplexValue() {
+ // Test with a realistic complex case
+ String json =
+ "{\"file-path\":\"/data/path:backup,archive\","
+ + "\"description\":\"Rewrite with strategy: binpack, sort\","
+ + "\"max-size\":536870912,"
+ + "\"enabled\":true}";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(4, result.size());
+ assertEquals("/data/path:backup,archive", result.get("file-path"));
+ assertEquals("Rewrite with strategy: binpack, sort",
result.get("description"));
+ assertEquals("536870912", result.get("max-size"));
+ assertEquals("true", result.get("enabled"));
+ }
+
+ @Test
+ public void testParseOptionsJsonWithInvalidJson() {
+ // Test that invalid JSON throws an exception
+ String json = "{invalid json}";
+ try {
+ IcebergRewriteDataFilesJob.parseOptionsJson(json);
+ fail("Expected IllegalArgumentException for invalid JSON");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Failed to parse options JSON"));
+ }
+ }
+
+ @Test
+ public void testParseOptionsJsonWithSpaces() {
+ String json = "{ \"min-input-files\" : \"2\" , \"target-file-size-bytes\"
: \"1024\" }";
+ Map<String, String> result =
IcebergRewriteDataFilesJob.parseOptionsJson(json);
+
+ assertEquals(2, result.size());
+ assertEquals("2", result.get("min-input-files"));
+ assertEquals("1024", result.get("target-file-size-bytes"));
+ }
+
+ // Test buildProcedureCall method
+
+ @Test
+ public void testBuildProcedureCallMinimal() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", null, null, null, null);
+
+ assertEquals("CALL iceberg_prod.system.rewrite_data_files(table =>
'db.sample')", sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithStrategy() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", "binpack", null, null, null);
+
+ assertEquals(
+ "CALL iceberg_prod.system.rewrite_data_files(table => 'db.sample',
strategy => 'binpack')",
+ sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithSortOrder() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", "sort", "id DESC NULLS LAST", null,
null);
+
+ assertEquals(
+ "CALL iceberg_prod.system.rewrite_data_files(table => 'db.sample',
strategy => 'sort', sort_order => 'id DESC NULLS LAST')",
+ sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithWhere() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", null, null, "year = 2024", null);
+
+ assertEquals(
+ "CALL iceberg_prod.system.rewrite_data_files(table => 'db.sample',
where => 'year = 2024')",
+ sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithOptions() {
+ String optionsJson =
"{\"min-input-files\":\"2\",\"target-file-size-bytes\":\"536870912\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", null, null, null, optionsJson);
+
+ assertTrue(sql.startsWith("CALL
iceberg_prod.system.rewrite_data_files(table => 'db.sample'"));
+ assertTrue(sql.contains("options => map("));
+ assertTrue(sql.contains("'min-input-files', '2'"));
+ assertTrue(sql.contains("'target-file-size-bytes', '536870912'"));
+ assertTrue(sql.endsWith(")"));
+ }
+
+ @Test
+ public void testBuildProcedureCallWithAllParameters() {
+ String optionsJson = "{\"min-input-files\":\"2\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", "sort", "id DESC NULLS LAST", "year =
2024", optionsJson);
+
+ assertTrue(sql.startsWith("CALL iceberg_prod.system.rewrite_data_files("));
+ assertTrue(sql.contains("table => 'db.sample'"));
+ assertTrue(sql.contains("strategy => 'sort'"));
+ assertTrue(sql.contains("sort_order => 'id DESC NULLS LAST'"));
+ assertTrue(sql.contains("where => 'year = 2024'"));
+ assertTrue(sql.contains("options => map('min-input-files', '2')"));
+ assertTrue(sql.endsWith(")"));
+ }
+
+ @Test
+ public void testBuildProcedureCallWithEmptyStrategy() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", "", null, null, null);
+
+ assertEquals("CALL iceberg_prod.system.rewrite_data_files(table =>
'db.sample')", sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithEmptyOptions() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", null, null, null, "{}");
+
+ assertEquals("CALL iceberg_prod.system.rewrite_data_files(table =>
'db.sample')", sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithZOrder() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", "sort", "zorder(c1,c2,c3)", null,
null);
+
+ assertEquals(
+ "CALL iceberg_prod.system.rewrite_data_files(table => 'db.sample',
strategy => 'sort', sort_order => 'zorder(c1,c2,c3)')",
+ sql);
+ }
+
+ @Test
+ public void testBuildProcedureCallWithComplexWhere() {
+ String whereClause = "year = 2024 AND month >= 1 AND month <= 12 AND
status = 'active'";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", null, null, whereClause, null);
+
+ // Single quotes in the WHERE clause should be escaped
+ assertTrue(
+ sql.contains(
+ "where => 'year = 2024 AND month >= 1 AND month <= 12 AND status =
''active'''"));
+ }
+
+ @Test
+ public void testBuildProcedureCallWithMultipleOptions() {
+ String optionsJson =
+
"{\"min-input-files\":\"5\",\"target-file-size-bytes\":\"1073741824\",\"remove-dangling-deletes\":\"true\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_prod", "db.sample", "binpack", null, null, optionsJson);
+
+ assertTrue(sql.contains("'min-input-files', '5'"));
+ assertTrue(sql.contains("'target-file-size-bytes', '1073741824'"));
+ assertTrue(sql.contains("'remove-dangling-deletes', 'true'"));
+ }
+
+ @Test
+ public void testEscapeSqlString() {
+ // Test basic escaping of single quotes
+ assertEquals("O''Brien",
IcebergRewriteDataFilesJob.escapeSqlString("O'Brien"));
+ assertEquals(
+ "test''with''quotes",
IcebergRewriteDataFilesJob.escapeSqlString("test'with'quotes"));
+
+ // Test strings without quotes remain unchanged
+ assertEquals("normal_string",
IcebergRewriteDataFilesJob.escapeSqlString("normal_string"));
+
+ // Test null and empty
+ assertEquals(null, IcebergRewriteDataFilesJob.escapeSqlString(null));
+ assertEquals("", IcebergRewriteDataFilesJob.escapeSqlString(""));
+ }
+
+ @Test
+ public void testEscapeSqlIdentifier() {
+ // Test basic escaping of backticks
+ assertEquals("catalog``name",
IcebergRewriteDataFilesJob.escapeSqlIdentifier("catalog`name"));
+
+ // Test strings without backticks remain unchanged
+ assertEquals(
+ "normal_catalog",
IcebergRewriteDataFilesJob.escapeSqlIdentifier("normal_catalog"));
+
+ // Test null
+ assertEquals(null, IcebergRewriteDataFilesJob.escapeSqlIdentifier(null));
+ }
+
+ @Test
+ public void testBuildProcedureCallWithSqlInjectionAttempt() {
+ // Test SQL injection attempt in table name
+ String maliciousTable = "db.table' OR '1'='1";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_catalog", maliciousTable, null, null, null, null);
+
+ // Verify single quotes are escaped (becomes '')
+ assertTrue(sql.contains("db.table'' OR ''1''=''1"));
+ assertFalse(sql.contains("' OR '1'='1"));
+
+ // Test SQL injection attempt in where clause
+ String maliciousWhere = "year = 2024' OR '1'='1";
+ sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "iceberg_catalog", "db.table", null, null, maliciousWhere, null);
+
+ assertTrue(sql.contains("year = 2024'' OR ''1''=''1"));
+
+ // Test SQL injection attempt in catalog name
+ String maliciousCatalog = "catalog`; DROP TABLE users; --";
+ sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ maliciousCatalog, "db.table", null, null, null, null);
+
+ // Verify backticks are escaped
+ assertTrue(sql.contains("catalog``; DROP TABLE users; --"));
+ }
+
+ @Test
+ public void testBuildProcedureCallEscapesAllParameters() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ "cat'alog", "db'.table", "sort'", "id' DESC", "year' = 2024",
"{\"key'\":\"val'ue\"}");
+
+ // Catalog name uses backtick escaping (but no backticks here, so
unchanged)
+ assertTrue(sql.contains("cat'alog"));
+ // All single quotes in string literals should be escaped
+ assertTrue(sql.contains("db''.table"));
+ assertTrue(sql.contains("sort''"));
+ assertTrue(sql.contains("id'' DESC"));
+ assertTrue(sql.contains("year'' = 2024"));
+ assertTrue(sql.contains("key''"));
+ assertTrue(sql.contains("val''ue"));
+ }
+
+ // Tests for strategy validation
+
+ @Test
+ public void testValidateStrategyWithValidBinpack() {
+ // Should not throw exception
+ IcebergRewriteDataFilesJob.validateStrategy("binpack");
+ }
+
+ @Test
+ public void testValidateStrategyWithValidSort() {
+ // Should not throw exception
+ IcebergRewriteDataFilesJob.validateStrategy("sort");
+ }
+
+ @Test
+ public void testValidateStrategyWithNull() {
+ // Should not throw exception - strategy is optional
+ IcebergRewriteDataFilesJob.validateStrategy(null);
+ }
+
+ @Test
+ public void testValidateStrategyWithEmptyString() {
+ // Should not throw exception - strategy is optional
+ IcebergRewriteDataFilesJob.validateStrategy("");
+ }
+
+ @Test
+ public void testValidateStrategyWithInvalidValue() {
+ try {
+ IcebergRewriteDataFilesJob.validateStrategy("invalid");
+ fail("Expected IllegalArgumentException for invalid strategy");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Invalid strategy 'invalid'"));
+ assertTrue(e.getMessage().contains("'binpack'"));
+ assertTrue(e.getMessage().contains("'sort'"));
+ }
+ }
+
+ @Test
+ public void testValidateStrategyWithZorder() {
+ // z-order is a sort order, not a strategy
+ try {
+ IcebergRewriteDataFilesJob.validateStrategy("z-order");
+ fail("Expected IllegalArgumentException for z-order as strategy");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Invalid strategy 'z-order'"));
+ assertTrue(e.getMessage().contains("Valid values are: 'binpack',
'sort'"));
+ }
+ }
+
+ @Test
+ public void testValidateStrategyIsCaseSensitive() {
+ // Strategy validation should be case-sensitive
+ try {
+ IcebergRewriteDataFilesJob.validateStrategy("BINPACK");
+ fail("Expected IllegalArgumentException for uppercase BINPACK");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Invalid strategy 'BINPACK'"));
+ }
+ }
+
+ // Tests for custom Spark configurations
+
+ @Test
+ public void testParseCustomSparkConfigsWithValidJson() {
+ String json =
"{\"spark.sql.shuffle.partitions\":\"200\",\"spark.executor.memory\":\"4g\"}";
+ Map<String, String> configs =
IcebergRewriteDataFilesJob.parseCustomSparkConfigs(json);
+
+ assertEquals(2, configs.size());
+ assertEquals("200", configs.get("spark.sql.shuffle.partitions"));
+ assertEquals("4g", configs.get("spark.executor.memory"));
+ }
+
+ @Test
+ public void testParseCustomSparkConfigsWithNumericValues() {
+ String json =
"{\"spark.sql.shuffle.partitions\":200,\"spark.executor.cores\":4}";
+ Map<String, String> configs =
IcebergRewriteDataFilesJob.parseCustomSparkConfigs(json);
+
+ assertEquals(2, configs.size());
+ assertEquals("200", configs.get("spark.sql.shuffle.partitions"));
+ assertEquals("4", configs.get("spark.executor.cores"));
+ }
+
+ @Test
+ public void testParseCustomSparkConfigsWithBooleanValues() {
+ String json =
"{\"spark.dynamicAllocation.enabled\":true,\"spark.speculation\":false}";
+ Map<String, String> configs =
IcebergRewriteDataFilesJob.parseCustomSparkConfigs(json);
+
+ assertEquals(2, configs.size());
+ assertEquals("true", configs.get("spark.dynamicAllocation.enabled"));
+ assertEquals("false", configs.get("spark.speculation"));
+ }
+
+ @Test
+ public void testParseCustomSparkConfigsWithEmptyString() {
+ Map<String, String> configs =
IcebergRewriteDataFilesJob.parseCustomSparkConfigs("");
+ assertTrue(configs.isEmpty());
+ }
+
+ @Test
+ public void testParseCustomSparkConfigsWithNull() {
+ Map<String, String> configs =
IcebergRewriteDataFilesJob.parseCustomSparkConfigs(null);
+ assertTrue(configs.isEmpty());
+ }
+
+ @Test
+ public void testParseCustomSparkConfigsWithInvalidJson() {
+ String json = "{invalid json}";
+ try {
+ IcebergRewriteDataFilesJob.parseCustomSparkConfigs(json);
+ fail("Expected IllegalArgumentException for invalid JSON");
+ } catch (IllegalArgumentException e) {
+ assertTrue(e.getMessage().contains("Failed to parse Spark configurations
JSON"));
+ }
+ }
+}
diff --git
a/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergRewriteDataFilesJobWithSpark.java
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergRewriteDataFilesJobWithSpark.java
new file mode 100644
index 0000000000..2ce3270a20
--- /dev/null
+++
b/maintenance/jobs/src/test/java/org/apache/gravitino/maintenance/jobs/iceberg/TestIcebergRewriteDataFilesJobWithSpark.java
@@ -0,0 +1,435 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.gravitino.maintenance.jobs.iceberg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.File;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+/**
+ * Integration tests for IcebergRewriteDataFilesJob that use a real Spark
session to verify the
+ * generated SQL procedure calls.
+ */
+public class TestIcebergRewriteDataFilesJobWithSpark {
+
+ @TempDir static File tempDir;
+
+ private static SparkSession spark;
+ private static String catalogName;
+ private static String warehousePath;
+
+ @BeforeAll
+ public static void setUp() {
+ warehousePath = new File(tempDir, "warehouse").getAbsolutePath();
+ catalogName = "test_catalog";
+
+ spark =
+ SparkSession.builder()
+ .appName("TestIcebergRewriteDataFilesJob")
+ .master("local[2]")
+ .config(
+ "spark.sql.extensions",
+
"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
+ .config("spark.sql.catalog." + catalogName,
"org.apache.iceberg.spark.SparkCatalog")
+ .config("spark.sql.catalog." + catalogName + ".type", "hadoop")
+ .config("spark.sql.catalog." + catalogName + ".warehouse",
warehousePath)
+ .getOrCreate();
+
+ // Create a test table with data
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".db");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS "
+ + catalogName
+ + ".db.test_table (id INT, name STRING, value DOUBLE) USING
iceberg");
+
+ // Insert test data
+ spark.sql(
+ "INSERT INTO "
+ + catalogName
+ + ".db.test_table VALUES (1, 'Alice', 100.0), (2, 'Bob', 200.0),
(3, 'Charlie', 300.0)");
+ }
+
+ @AfterAll
+ public static void tearDown() {
+ if (spark != null) {
+ spark.sql("DROP TABLE IF EXISTS " + catalogName + ".db.test_table");
+ spark.sql("DROP NAMESPACE IF EXISTS " + catalogName + ".db");
+ spark.stop();
+ }
+ }
+
+ @Test
+ public void testBuildProcedureCallGeneratesValidSQL() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, null, null);
+
+ assertNotNull(sql);
+ assertTrue(sql.startsWith("CALL " + catalogName +
".system.rewrite_data_files("));
+ assertTrue(sql.contains("table => 'db.test_table'"));
+ }
+
+ @Test
+ public void testExecuteRewriteDataFilesMinimal() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, null, null);
+
+ // Execute the procedure
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+
+ // Verify we got a result
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+
+ // Result columns: rewritten_data_files_count, added_data_files_count,
+ // rewritten_bytes_count, failed_data_files_count (Iceberg 1.6.1)
+ Row resultRow = rows[0];
+ assertTrue(resultRow.size() >= 4, "Result should have at least 4 columns");
+
+ // The counts should be non-negative
+ assertTrue(resultRow.getInt(0) >= 0); // rewritten_data_files_count
+ assertTrue(resultRow.getInt(1) >= 0); // added_data_files_count
+ assertTrue(resultRow.getLong(2) >= 0); // rewritten_bytes_count
+ assertEquals(0, resultRow.getInt(3)); // failed_data_files_count should be 0
+ }
+
+ @Test
+ public void testExecuteRewriteDataFilesWithStrategy() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", "binpack", null, null, null);
+
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+ }
+
+ @Test
+ public void testExecuteRewriteDataFilesWithOptions() {
+ String optionsJson = "{\"min-input-files\":\"1\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, null, optionsJson);
+
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+ }
+
+ @Test
+ public void testExecuteRewriteDataFilesWithWhere() {
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, "id > 1", null);
+
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+ }
+
+ @Test
+ public void testExecuteRewriteDataFilesWithAllParameters() {
+ String optionsJson = "{\"min-input-files\":\"1\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", "binpack", null, "id >= 1",
optionsJson);
+
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+
+ // Verify all columns are present
+ Row resultRow = rows[0];
+ assertTrue(resultRow.size() >= 4, "Result should have at least 4 columns");
+ }
+
+ @Test
+ public void testTableDataIntegrityAfterRewrite() {
+ // Get initial row count
+ long initialCount =
+ spark.sql("SELECT COUNT(*) FROM " + catalogName +
".db.test_table").first().getLong(0);
+
+ // Execute rewrite
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", "binpack", null, null, null);
+ spark.sql(sql);
+
+ // Verify row count is the same after rewrite
+ long finalCount =
+ spark.sql("SELECT COUNT(*) FROM " + catalogName +
".db.test_table").first().getLong(0);
+ assertEquals(initialCount, finalCount, "Row count should remain the same
after rewrite");
+
+ // Verify data integrity
+ Row[] rows =
+ (Row[]) spark.sql("SELECT * FROM " + catalogName + ".db.test_table
ORDER BY id").collect();
+ assertEquals(3, rows.length);
+ assertEquals(1, rows[0].getInt(0));
+ assertEquals("Alice", rows[0].getString(1));
+ assertEquals(2, rows[1].getInt(0));
+ assertEquals("Bob", rows[1].getString(1));
+ assertEquals(3, rows[2].getInt(0));
+ assertEquals("Charlie", rows[2].getString(1));
+ }
+
+ @Test
+ public void testMultipleOptionsInProcedureCall() {
+ String optionsJson =
"{\"min-input-files\":\"1\",\"target-file-size-bytes\":\"536870912\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", "binpack", null, null, optionsJson);
+
+ // Verify SQL contains both options
+ assertTrue(sql.contains("'min-input-files', '1'"));
+ assertTrue(sql.contains("'target-file-size-bytes', '536870912'"));
+
+ // Execute to verify it's valid
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+ }
+
+ @Test
+ public void testSqlInjectionInTableNameIsEscaped() {
+ // Create a table with single quote in the namespace for testing
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".db_test");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS "
+ + catalogName
+ + ".db_test.special_table (id INT, name STRING) USING iceberg");
+ spark.sql("INSERT INTO " + catalogName + ".db_test.special_table VALUES
(1, 'test')");
+
+ try {
+ // Attempt SQL injection with single quotes - should be safely escaped
+ String maliciousTable = "db_test.special_table' OR '1'='1";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, maliciousTable, null, null, null, null);
+
+ // Verify the SQL is escaped (single quotes become double quotes)
+ assertTrue(sql.contains("db_test.special_table'' OR ''1''=''1"));
+
+ // The SQL should fail with table not found (not execute malicious code)
+ // because the escaped table name doesn't exist
+ try {
+ spark.sql(sql);
+ // If it somehow succeeds, that's also acceptable (means no injection
occurred)
+ } catch (Exception e) {
+ // Expected: table not found or similar error (not a syntax error)
+ String errorMsg = e.getMessage().toLowerCase();
+ assertTrue(
+ errorMsg.contains("table")
+ || errorMsg.contains("not found")
+ || errorMsg.contains("identifier"),
+ "Error should be about table not found, not SQL syntax: " +
errorMsg);
+ }
+ } finally {
+ spark.sql("DROP TABLE IF EXISTS " + catalogName +
".db_test.special_table");
+ spark.sql("DROP NAMESPACE IF EXISTS " + catalogName + ".db_test");
+ }
+ }
+
+ @Test
+ public void testSqlInjectionInWhereClauseIsEscaped() {
+ // Attempt SQL injection in WHERE clause
+ String maliciousWhere = "id > 0' OR '1'='1";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, maliciousWhere, null);
+
+ // Verify escaping occurred
+ assertTrue(sql.contains("id > 0'' OR ''1''=''1"));
+
+ // The SQL should fail because the WHERE clause is invalid (not execute
malicious code)
+ try {
+ spark.sql(sql);
+ // If it somehow succeeds, verify data integrity wasn't compromised
+ long count =
+ spark.sql("SELECT COUNT(*) FROM " + catalogName +
".db.test_table").first().getLong(0);
+ assertEquals(3, count, "Data should not be modified");
+ } catch (Exception e) {
+ // Expected: syntax error or invalid WHERE clause
+ String errorMsg = e.getMessage().toLowerCase();
+ assertTrue(
+ errorMsg.contains("syntax") || errorMsg.contains("parse") ||
errorMsg.contains("invalid"),
+ "Error should be about invalid syntax: " + errorMsg);
+ }
+ }
+
+ @Test
+ public void testSingleQuotesInValidDataAreEscaped() {
+ // Create a table with a name containing special characters
+ spark.sql("CREATE NAMESPACE IF NOT EXISTS " + catalogName + ".db_special");
+ spark.sql(
+ "CREATE TABLE IF NOT EXISTS "
+ + catalogName
+ + ".db_special.data_table (id INT, description STRING) USING
iceberg");
+ spark.sql(
+ "INSERT INTO "
+ + catalogName
+ + ".db_special.data_table VALUES (1, 'test'), (2, 'O''Brien'), (3,
'It''s working')");
+
+ try {
+ // Use a WHERE clause with single quotes in legitimate string comparison
+ String whereClause = "description = 'O''Brien'";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db_special.data_table", null, null, whereClause,
null);
+
+ // Verify double quotes are further escaped (becomes 4 single quotes)
+ assertTrue(sql.contains("description = ''O''''Brien''"));
+
+ // Execute - this should work or fail gracefully without injection
+ try {
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+ assertNotNull(rows);
+ assertTrue(rows.length > 0);
+ } catch (Exception e) {
+ // If it fails, should be a legitimate error, not injection
+ String errorMsg = e.getMessage().toLowerCase();
+ // Should not contain anything about dropped tables or unauthorized
operations
+ assertFalse(errorMsg.contains("drop"));
+ assertFalse(errorMsg.contains("delete"));
+ }
+
+ // Verify table still exists and data is intact
+ long count =
+ spark
+ .sql("SELECT COUNT(*) FROM " + catalogName +
".db_special.data_table")
+ .first()
+ .getLong(0);
+ assertEquals(3, count, "All rows should still exist");
+ } finally {
+ spark.sql("DROP TABLE IF EXISTS " + catalogName +
".db_special.data_table");
+ spark.sql("DROP NAMESPACE IF EXISTS " + catalogName + ".db_special");
+ }
+ }
+
+ @Test
+ public void testBackticksInCatalogNameAreEscaped() {
+ // Test that backticks in catalog name don't break out of identifier
context
+ String maliciousCatalog = "test`catalog";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ maliciousCatalog, "db.test_table", null, null, null, null);
+
+ // Verify backticks are escaped
+ assertTrue(sql.contains("test``catalog"));
+
+ // Should fail with catalog not found (not cause SQL injection)
+ try {
+ spark.sql(sql);
+ } catch (Exception e) {
+ String errorMsg = e.getMessage().toLowerCase();
+ assertTrue(
+ errorMsg.contains("catalog")
+ || errorMsg.contains("not found")
+ || errorMsg.contains("identifier"),
+ "Error should be about catalog not found: " + errorMsg);
+ }
+ }
+
+ @Test
+ public void testJsonParsingWithColonsAndCommas() {
+ // Test that JSON values with colons and commas are correctly parsed
+ String optionsJson =
+
"{\"path\":\"/data/file:backup,archive\",\"url\":\"http://example.com:8080\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, null, optionsJson);
+
+ // Verify the values are correctly included in the SQL
+ assertTrue(sql.contains("'path', '/data/file:backup,archive'"));
+ assertTrue(sql.contains("'url', 'http://example.com:8080'"));
+
+ // Note: This will fail because these aren't valid Iceberg options,
+ // but it validates that the JSON parsing doesn't break on colons/commas
+ try {
+ spark.sql(sql);
+ } catch (Exception e) {
+ // Expected - invalid options, but SQL should be syntactically valid
+ assertFalse(
+ e.getMessage().toLowerCase().contains("parse"),
+ "Should not be a parsing error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJsonParsingWithNumericAndBooleanValues() {
+ // Test that numeric and boolean JSON values are correctly converted to
strings
+ String optionsJson =
"{\"max-file-size-bytes\":1073741824,\"rewrite-all\":true}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, null, optionsJson);
+
+ // Verify numeric and boolean values are converted to strings
+ assertTrue(sql.contains("'max-file-size-bytes', '1073741824'"));
+ assertTrue(sql.contains("'rewrite-all', 'true'"));
+
+ // Execute to verify the SQL is valid
+ try {
+ Dataset<Row> result = spark.sql(sql);
+ Row[] rows = (Row[]) result.collect();
+ assertNotNull(rows);
+ } catch (Exception e) {
+ // If it fails, should not be a JSON parsing issue
+ assertFalse(
+ e.getMessage().toLowerCase().contains("json"),
+ "Should not be a JSON error: " + e.getMessage());
+ }
+ }
+
+ @Test
+ public void testJsonParsingWithEscapedQuotes() {
+ // Test that escaped quotes in JSON values are correctly handled
+ String optionsJson = "{\"description\":\"Rewrite with \\\"quoted\\\"
text\"}";
+ String sql =
+ IcebergRewriteDataFilesJob.buildProcedureCall(
+ catalogName, "db.test_table", null, null, null, optionsJson);
+
+ // Verify the escaped quotes are properly handled
+ // The JSON parser converts \" to ", then SQL escaping converts " to ''
for single quotes
+ assertTrue(
+ sql.contains("'description', 'Rewrite with \"quoted\" text'")
+ || sql.contains("description"));
+ }
+}