This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new c7ed71e970 Spark 4.1: Align handling of branches in reads and writes
(#15288)
c7ed71e970 is described below
commit c7ed71e970781ca6c7fa44a3900625600830e9d4
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Feb 16 22:35:36 2026 -0800
Spark 4.1: Align handling of branches in reads and writes (#15288)
---
.../apache/iceberg/spark/PlanningBenchmark.java | 2 +-
.../iceberg/spark/TaskGroupPlanningBenchmark.java | 5 +-
.../iceberg/spark/extensions/TestDelete.java | 16 ++--
.../apache/iceberg/spark/extensions/TestMerge.java | 26 +++---
.../iceberg/spark/extensions/TestUpdate.java | 16 ++--
.../org/apache/iceberg/spark/SparkReadConf.java | 42 +++------
.../org/apache/iceberg/spark/SparkTableUtil.java | 101 +++++++++++++++++----
.../org/apache/iceberg/spark/SparkWriteConf.java | 46 +++-------
.../apache/iceberg/spark/SparkWriteOptions.java | 3 +
.../apache/iceberg/spark/source/SparkTable.java | 2 +-
.../iceberg/SparkDistributedDataScanTestBase.java | 3 +-
.../TestSparkDistributedDataScanDeletes.java | 3 +-
.../TestSparkDistributedDataScanFilterFiles.java | 3 +-
.../TestSparkDistributedDataScanReporting.java | 3 +-
.../TestSparkDistributionAndOrderingUtil.java | 7 +-
.../iceberg/spark/TestSparkExecutorCache.java | 6 +-
.../apache/iceberg/spark/TestSparkWriteConf.java | 50 +++++-----
.../spark/actions/TestRewriteDataFilesAction.java | 2 +-
.../sql/TestPartitionedWritesToWapBranch.java | 26 ++++--
19 files changed, 205 insertions(+), 157 deletions(-)
diff --git
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
index 0eff3a847e..c50a3fd406 100644
---
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
+++
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
@@ -398,7 +398,7 @@ public class PlanningBenchmark {
.set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
.commit();
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
return new SparkDistributedDataScan(spark, table, readConf);
}
diff --git
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
index 45c95bf997..8a8097834e 100644
---
a/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
+++
b/spark/v4.1/spark-extensions/src/jmh/java/org/apache/iceberg/spark/TaskGroupPlanningBenchmark.java
@@ -39,7 +39,6 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
import org.apache.iceberg.util.TableScanUtil;
@@ -107,7 +106,7 @@ public class TaskGroupPlanningBenchmark {
@Benchmark
@Threads(1)
public void planTaskGroups(Blackhole blackhole) {
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
List<ScanTaskGroup<FileScanTask>> taskGroups =
TableScanUtil.planTaskGroups(
fileTasks,
@@ -137,7 +136,7 @@ public class TaskGroupPlanningBenchmark {
@Benchmark
@Threads(1)
public void planTaskGroupsWithGrouping(Blackhole blackhole) {
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
List<ScanTaskGroup<FileScanTask>> taskGroups =
TableScanUtil.planTaskGroups(
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fbf6ce3559..7e0f6207ed 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -1375,15 +1375,17 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new
Employee(2, "hr"));
createBranchIfNeeded();
+ // writing to explicit branch should succeed even with WAP branch set
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
- () ->
- assertThatThrownBy(() -> sql("DELETE FROM %s t WHERE id=0",
commitTarget()))
- .isInstanceOf(ValidationException.class)
- .hasMessage(
- String.format(
- "Cannot write to both branch and WAP branch, but got
branch [%s] and WAP branch [wap]",
- branch)));
+ () -> {
+ sql("DELETE FROM %s t WHERE id=0", commitTarget());
+
+ assertEquals(
+ "Should have deleted row in explicit branch",
+ ImmutableList.of(row(1, "hr"), row(2, "hr")),
+ sql("SELECT * FROM %s ORDER BY id", commitTarget()));
+ });
}
@TestTemplate
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 3f584031d9..b21d1a4042 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -2983,21 +2983,21 @@ public abstract class TestMerge extends
SparkRowLevelOperationsTestBase {
ImmutableList<Object[]> expectedRows =
ImmutableList.of(row(-1), row(0), row(1), row(2), row(3), row(4));
+ // writing to explicit branch should succeed even with WAP branch set
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
- () ->
- assertThatThrownBy(
- () ->
- sql(
- "MERGE INTO %s t USING source s ON t.id = s.id "
- + "WHEN MATCHED THEN UPDATE SET *"
- + "WHEN NOT MATCHED THEN INSERT *",
- commitTarget()))
- .isInstanceOf(ValidationException.class)
- .hasMessage(
- String.format(
- "Cannot write to both branch and WAP branch, but got
branch [%s] and WAP branch [wap]",
- branch)));
+ () -> {
+ sql(
+ "MERGE INTO %s t USING source s ON t.id = s.id "
+ + "WHEN MATCHED THEN UPDATE SET *"
+ + "WHEN NOT MATCHED THEN INSERT *",
+ commitTarget());
+
+ assertEquals(
+ "Should have expected rows in explicit branch",
+ expectedRows,
+ sql("SELECT * FROM %s ORDER BY id", commitTarget()));
+ });
}
private void checkJoinAndFilterConditions(String query, String join, String
icebergFilters) {
diff --git
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 2afbc697e1..79ea5c2138 100644
---
a/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++
b/spark/v4.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -1500,15 +1500,17 @@ public abstract class TestUpdate extends
SparkRowLevelOperationsTestBase {
"ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+ // writing to explicit branch should succeed even with WAP branch set
withSQLConf(
ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, "wap"),
- () ->
- assertThatThrownBy(() -> sql("UPDATE %s SET dep='hr' WHERE
dep='a'", commitTarget()))
- .isInstanceOf(ValidationException.class)
- .hasMessage(
- String.format(
- "Cannot write to both branch and WAP branch, but got
branch [%s] and WAP branch [wap]",
- branch)));
+ () -> {
+ sql("UPDATE %s SET dep='software' WHERE dep='hr'", commitTarget());
+
+ assertEquals(
+ "Should have updated row in explicit branch",
+ ImmutableList.of(row(1, "software")),
+ sql("SELECT * FROM %s ORDER BY id", commitTarget()));
+ });
}
private RowLevelOperationMode mode(Table table) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
index 238919ace7..ec56e5f239 100644
--- a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
+++ b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java
@@ -20,16 +20,14 @@ package org.apache.iceberg.spark;
import static org.apache.iceberg.PlanningMode.LOCAL;
-import java.util.Map;
import org.apache.iceberg.PlanningMode;
import org.apache.iceberg.SupportsDistributedScanPlanning;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.Util;
-import org.apache.iceberg.util.PropertyUtil;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
/**
* A class for common Iceberg configs for Spark reads.
@@ -58,18 +56,24 @@ public class SparkReadConf {
private final SparkSession spark;
private final Table table;
private final String branch;
+ private final CaseInsensitiveStringMap options;
private final SparkConfParser confParser;
- public SparkReadConf(SparkSession spark, Table table, Map<String, String>
readOptions) {
- this(spark, table, null, readOptions);
+ public SparkReadConf(SparkSession spark, Table table) {
+ this(spark, table, CaseInsensitiveStringMap.empty());
+ }
+
+ public SparkReadConf(SparkSession spark, Table table,
CaseInsensitiveStringMap options) {
+ this(spark, table, null, options);
}
public SparkReadConf(
- SparkSession spark, Table table, String branch, Map<String, String>
readOptions) {
+ SparkSession spark, Table table, String branch, CaseInsensitiveStringMap
options) {
this.spark = spark;
this.table = table;
this.branch = branch;
- this.confParser = new SparkConfParser(spark, table, readOptions);
+ this.options = options;
+ this.confParser = new SparkConfParser(spark, table, options);
}
public boolean caseSensitive() {
@@ -103,29 +107,7 @@ public class SparkReadConf {
}
public String branch() {
- String optionBranch =
confParser.stringConf().option(SparkReadOptions.BRANCH).parseOptional();
- ValidationException.check(
- branch == null || optionBranch == null || optionBranch.equals(branch),
- "Must not specify different branches in both table identifier and read
option, "
- + "got [%s] in identifier and [%s] in options",
- branch,
- optionBranch);
- String inputBranch = branch != null ? branch : optionBranch;
- if (inputBranch != null) {
- return inputBranch;
- }
-
- boolean wapEnabled =
- PropertyUtil.propertyAsBoolean(
- table.properties(), TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
false);
- if (wapEnabled) {
- String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
- if (wapBranch != null && table.refs().containsKey(wapBranch)) {
- return wapBranch;
- }
- }
-
- return null;
+ return SparkTableUtil.determineReadBranch(spark, table, branch, options);
}
public String tag() {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 0b4578630a..593154d72a 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -60,7 +60,6 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.TableMigrationUtil;
-import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
@@ -1025,34 +1024,102 @@ public class SparkTableUtil {
sparkTable, Option.empty(), Option.empty(), options, Option.empty());
}
+ public static String determineWriteBranch(SparkSession spark, Table table,
String branch) {
+ return determineWriteBranch(spark, table, branch,
CaseInsensitiveStringMap.empty());
+ }
+
/**
* Determine the write branch.
*
- * <p>Validate wap config and determine the write branch.
+ * <p>The target branch can be specified via table identifier, write option,
or in SQL:
+ *
+ * <ul>
+ * <li>The identifier and option branches can't conflict. If both are set,
they must match.
+ * <li>Identifier and option branches take priority over the session WAP
branch.
+ * <li>If neither the option nor the identifier branch is set and WAP is
enabled for this table,
+ * use the WAP branch from the session SQL config.
+ * </ul>
+ *
+ * <p>Note: WAP ID and WAP branch cannot be set at the same time.
+ *
+ * <p>Note: The target branch may be created during the write operation if
it does not exist.
*
* @param spark a Spark Session
- * @param branch write branch if there is no WAP branch configured
- * @return branch for write operation
+ * @param table the table being written to
+ * @param branch write branch configured via table identifier, or null
+ * @param options write options
+ * @return branch for write operation, or null for main branch
*/
- public static String determineWriteBranch(SparkSession spark, String branch)
{
+ public static String determineWriteBranch(
+ SparkSession spark, Table table, String branch, CaseInsensitiveStringMap
options) {
+ String optionBranch = options.get(SparkWriteOptions.BRANCH);
+ if (optionBranch != null) {
+ Preconditions.checkArgument(
+ branch == null || optionBranch.equals(branch),
+ "Explicitly configured branch [%s] and write option [%s] are in
conflict",
+ branch,
+ optionBranch);
+ return optionBranch;
+ }
+
+ if (branch == null && wapEnabled(table)) {
+ return wapSessionBranch(spark);
+ }
+
+ return branch;
+ }
+
+ /**
+ * Determine the read branch.
+ *
+ * <p>The target branch can be specified via table identifier, read option,
or in SQL:
+ *
+ * <ul>
+ * <li>The identifier and option branches can't conflict. If both are set,
they must match.
+ * <li>Identifier and option branches take priority over the session WAP
branch.
+ * <li>If neither the option nor the identifier branch is set and WAP is
enabled for this table,
+ * use the WAP branch from the session SQL config (only if the branch
already exists).
+ * </ul>
+ *
+ * <p>Note: WAP ID and WAP branch cannot be set at the same time.
+ *
+ * @param spark a Spark Session
+ * @param table the table being read from
+ * @param branch read branch configured via table identifier, or null
+ * @param options read options
+ * @return branch for read operation, or null for main branch
+ */
+ public static String determineReadBranch(
+ SparkSession spark, Table table, String branch, CaseInsensitiveStringMap
options) {
+ String optionBranch = options.get(SparkReadOptions.BRANCH);
+ if (optionBranch != null) {
+ Preconditions.checkArgument(
+ branch == null || optionBranch.equals(branch),
+ "Explicitly configured branch [%s] and read option [%s] are in
conflict",
+ branch,
+ optionBranch);
+ return optionBranch;
+ }
+
+ if (branch == null && wapEnabled(table)) {
+ String wapBranch = wapSessionBranch(spark);
+ if (wapBranch != null && table.refs().containsKey(wapBranch)) {
+ return wapBranch;
+ }
+ }
+
+ return branch;
+ }
+
+ private static String wapSessionBranch(SparkSession spark) {
String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null);
String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
- ValidationException.check(
+ Preconditions.checkArgument(
wapId == null || wapBranch == null,
"Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
wapId,
wapBranch);
-
- if (wapBranch != null) {
- ValidationException.check(
- branch == null,
- "Cannot write to both branch and WAP branch, but got branch [%s] and
WAP branch [%s]",
- branch,
- wapBranch);
-
- return wapBranch;
- }
- return branch;
+ return wapBranch;
}
public static boolean wapEnabled(Table table) {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 6648d7ea38..d74754e4d3 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -46,7 +46,6 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableUtil;
import org.apache.iceberg.deletes.DeleteGranularity;
-import org.apache.iceberg.exceptions.ValidationException;
import
org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
@@ -55,6 +54,7 @@ import org.apache.spark.sql.RuntimeConfig;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.connector.write.RowLevelOperation.Command;
import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
@@ -88,21 +88,25 @@ public class SparkWriteConf {
private final Table table;
private final String branch;
private final RuntimeConfig sessionConf;
- private final Map<String, String> writeOptions;
+ private final CaseInsensitiveStringMap options;
private final SparkConfParser confParser;
- public SparkWriteConf(SparkSession spark, Table table, Map<String, String>
writeOptions) {
- this(spark, table, null, writeOptions);
+ public SparkWriteConf(SparkSession spark, Table table) {
+ this(spark, table, null, CaseInsensitiveStringMap.empty());
+ }
+
+ public SparkWriteConf(SparkSession spark, Table table,
CaseInsensitiveStringMap options) {
+ this(spark, table, null, options);
}
public SparkWriteConf(
- SparkSession spark, Table table, String branch, Map<String, String>
writeOptions) {
+ SparkSession spark, Table table, String branch, CaseInsensitiveStringMap
options) {
this.spark = spark;
this.table = table;
this.branch = branch;
this.sessionConf = spark.conf();
- this.writeOptions = writeOptions;
- this.confParser = new SparkConfParser(spark, table, writeOptions);
+ this.options = options;
+ this.confParser = new SparkConfParser(spark, table, options);
}
public boolean checkNullability() {
@@ -124,7 +128,7 @@ public class SparkWriteConf {
}
public String overwriteMode() {
- String overwriteMode = writeOptions.get(SparkWriteOptions.OVERWRITE_MODE);
+ String overwriteMode = options.get(SparkWriteOptions.OVERWRITE_MODE);
return overwriteMode != null ? overwriteMode.toLowerCase(Locale.ROOT) :
null;
}
@@ -262,7 +266,7 @@ public class SparkWriteConf {
// Add write options, overriding session configuration if necessary
extraSnapshotMetadata.putAll(
- PropertyUtil.propertiesWithPrefix(writeOptions,
SnapshotSummary.EXTRA_METADATA_PREFIX));
+ PropertyUtil.propertiesWithPrefix(options,
SnapshotSummary.EXTRA_METADATA_PREFIX));
return extraSnapshotMetadata;
}
@@ -459,29 +463,7 @@ public class SparkWriteConf {
}
public String branch() {
- if (wapEnabled()) {
- String wapId = wapId();
- String wapBranch =
-
confParser.stringConf().sessionConf(SparkSQLProperties.WAP_BRANCH).parseOptional();
-
- ValidationException.check(
- wapId == null || wapBranch == null,
- "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
- wapId,
- wapBranch);
-
- if (wapBranch != null) {
- ValidationException.check(
- branch == null,
- "Cannot write to both branch and WAP branch, but got branch [%s]
and WAP branch [%s]",
- branch,
- wapBranch);
-
- return wapBranch;
- }
- }
-
- return branch;
+ return SparkTableUtil.determineWriteBranch(spark, table, branch, options);
}
public Map<String, String> writeProperties() {
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
index 40816eef2f..86c27acd88 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkWriteOptions.java
@@ -23,6 +23,9 @@ public class SparkWriteOptions {
private SparkWriteOptions() {}
+ // Overrides the target branch for write operations
+ public static final String BRANCH = "branch";
+
// Fileformat for write operations(default: Table write.format.default )
public static final String WRITE_FORMAT = "write-format";
diff --git
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 335d0f72fd..24915a1bfc 100644
---
a/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -390,7 +390,7 @@ public class SparkTable
.deleteFromRowFilter(deleteExpr);
if (SparkTableUtil.wapEnabled(table())) {
- branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch);
+ branch = SparkTableUtil.determineWriteBranch(sparkSession(),
icebergTable, branch);
}
if (branch != null) {
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
index 404ba72846..aa4f3dc724 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/SparkDistributedDataScanTestBase.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
import java.util.Arrays;
import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
@@ -81,7 +80,7 @@ public abstract class SparkDistributedDataScanTestBase
@Override
protected BatchScan newScan() {
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
return new SparkDistributedDataScan(spark, table, readConf);
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
index 659507e4c5..6ffaede5b0 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanDeletes.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
import java.util.Arrays;
import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
@@ -85,7 +84,7 @@ public class TestSparkDistributedDataScanDeletes
@Override
protected BatchScan newScan(Table table) {
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
return new SparkDistributedDataScan(spark, table, readConf);
}
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
index a218f965ea..1e680ace29 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanFilterFiles.java
@@ -21,7 +21,6 @@ package org.apache.iceberg;
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
@@ -79,7 +78,7 @@ public class TestSparkDistributedDataScanFilterFiles
.set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
.commit();
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
return new SparkDistributedDataScan(spark, table, readConf);
}
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
index 2665d7ba8d..9b736004de 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/TestSparkDistributedDataScanReporting.java
@@ -23,7 +23,6 @@ import static org.apache.iceberg.PlanningMode.LOCAL;
import java.util.Arrays;
import java.util.List;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.spark.SparkReadConf;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.internal.SQLConf;
@@ -80,7 +79,7 @@ public class TestSparkDistributedDataScanReporting
.set(TableProperties.DATA_PLANNING_MODE, dataMode.modeName())
.set(TableProperties.DELETE_PLANNING_MODE, deleteMode.modeName())
.commit();
- SparkReadConf readConf = new SparkReadConf(spark, table,
ImmutableMap.of());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
return new SparkDistributedDataScan(spark, table, readConf);
}
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index ca86350346..21b67b89a9 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -34,7 +34,6 @@ import static org.assertj.core.api.Assertions.assertThat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.spark.sql.connector.distributions.Distribution;
import org.apache.spark.sql.connector.distributions.Distributions;
import org.apache.spark.sql.connector.expressions.Expression;
@@ -2976,7 +2975,7 @@ public class TestSparkDistributionAndOrderingUtil extends
TestBaseWithCatalog {
private void checkWriteDistributionAndOrdering(
Table table, Distribution expectedDistribution, SortOrder[]
expectedOrdering) {
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
SparkWriteRequirements requirements = writeConf.writeRequirements();
@@ -2992,7 +2991,7 @@ public class TestSparkDistributionAndOrderingUtil extends
TestBaseWithCatalog {
Command command,
Distribution expectedDistribution,
SortOrder[] expectedOrdering) {
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
SparkWriteRequirements requirements =
writeConf.copyOnWriteRequirements(command);
@@ -3008,7 +3007,7 @@ public class TestSparkDistributionAndOrderingUtil extends
TestBaseWithCatalog {
Command command,
Distribution expectedDistribution,
SortOrder[] expectedOrdering) {
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
SparkWriteRequirements requirements =
writeConf.positionDeltaRequirements(command);
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
index a411c3fc70..607955971d 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkExecutorCache.java
@@ -191,7 +191,7 @@ public class TestSparkExecutorCache extends
TestBaseWithCatalog {
SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "false"),
() -> {
- SparkReadConf readConf = new SparkReadConf(spark, table,
Collections.emptyMap());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
assertThat(readConf.cacheDeleteFilesOnExecutors()).isFalse();
});
@@ -200,7 +200,7 @@ public class TestSparkExecutorCache extends
TestBaseWithCatalog {
SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "true",
SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "true"),
() -> {
- SparkReadConf readConf = new SparkReadConf(spark, table,
Collections.emptyMap());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
assertThat(readConf.cacheDeleteFilesOnExecutors()).isTrue();
});
@@ -209,7 +209,7 @@ public class TestSparkExecutorCache extends
TestBaseWithCatalog {
SparkSQLProperties.EXECUTOR_CACHE_ENABLED, "false",
SparkSQLProperties.EXECUTOR_CACHE_DELETE_FILES_ENABLED, "true"),
() -> {
- SparkReadConf readConf = new SparkReadConf(spark, table,
Collections.emptyMap());
+ SparkReadConf readConf = new SparkReadConf(spark, table);
assertThat(readConf.cacheDeleteFilesOnExecutors()).isFalse();
});
}
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
index 61aacfa458..227b93dfa4 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/TestSparkWriteConf.java
@@ -60,6 +60,7 @@ import org.apache.iceberg.deletes.DeleteGranularity;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
@@ -143,7 +144,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
@TestTemplate
public void testDeleteGranularityDefault() {
Table table = validationCatalog.loadTable(tableIdent);
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
DeleteGranularity value = writeConf.deleteGranularity();
assertThat(value).isEqualTo(DeleteGranularity.FILE);
@@ -158,7 +159,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
.set(TableProperties.DELETE_GRANULARITY,
DeleteGranularity.PARTITION.toString())
.commit();
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
DeleteGranularity value = writeConf.deleteGranularity();
assertThat(value).isEqualTo(DeleteGranularity.PARTITION);
@@ -176,7 +177,8 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
Map<String, String> options =
ImmutableMap.of(SparkWriteOptions.DELETE_GRANULARITY,
DeleteGranularity.FILE.toString());
- SparkWriteConf writeConf = new SparkWriteConf(spark, table, options);
+ SparkWriteConf writeConf =
+ new SparkWriteConf(spark, table, new
CaseInsensitiveStringMap(options));
DeleteGranularity value = writeConf.deleteGranularity();
assertThat(value).isEqualTo(DeleteGranularity.FILE);
@@ -188,7 +190,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
table.updateProperties().set(TableProperties.DELETE_GRANULARITY,
"invalid").commit();
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
assertThatThrownBy(writeConf::deleteGranularity)
.isInstanceOf(IllegalArgumentException.class)
@@ -199,7 +201,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
public void testAdvisoryPartitionSize() {
Table table = validationCatalog.loadTable(tableIdent);
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
long value1 = writeConf.writeRequirements().advisoryPartitionSize();
assertThat(value1).isGreaterThan(64L * 1024 * 1024).isLessThan(2L * 1024 *
1024 * 1024);
@@ -217,7 +219,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
public void testSparkWriteConfDistributionDefault() {
Table table = validationCatalog.loadTable(tableIdent);
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
checkMode(DistributionMode.HASH, writeConf);
}
@@ -226,8 +228,9 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
public void testSparkWriteConfDistributionModeWithWriteOption() {
Table table = validationCatalog.loadTable(tableIdent);
- Map<String, String> writeOptions =
- ImmutableMap.of(SparkWriteOptions.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName());
+ CaseInsensitiveStringMap writeOptions =
+ new CaseInsensitiveStringMap(
+ ImmutableMap.of(SparkWriteOptions.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName()));
SparkWriteConf writeConf = new SparkWriteConf(spark, table, writeOptions);
checkMode(DistributionMode.NONE, writeConf);
@@ -239,7 +242,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
ImmutableMap.of(SparkSQLProperties.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName()),
() -> {
Table table = validationCatalog.loadTable(tableIdent);
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
checkMode(DistributionMode.NONE, writeConf);
});
}
@@ -256,7 +259,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
.set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
.commit();
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
checkMode(DistributionMode.NONE, writeConf);
}
@@ -275,7 +278,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
.set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
.commit();
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
// session config overwrite the table properties
checkMode(DistributionMode.NONE, writeConf);
});
@@ -288,9 +291,10 @@ public class TestSparkWriteConf extends
TestBaseWithCatalog {
() -> {
Table table = validationCatalog.loadTable(tableIdent);
- Map<String, String> writeOptions =
- ImmutableMap.of(
- SparkWriteOptions.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName());
+ CaseInsensitiveStringMap writeOptions =
+ new CaseInsensitiveStringMap(
+ ImmutableMap.of(
+ SparkWriteOptions.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName()));
SparkWriteConf writeConf = new SparkWriteConf(spark, table,
writeOptions);
// write options overwrite the session config
@@ -305,9 +309,10 @@ public class TestSparkWriteConf extends
TestBaseWithCatalog {
() -> {
Table table = validationCatalog.loadTable(tableIdent);
- Map<String, String> writeOptions =
- ImmutableMap.of(
- SparkWriteOptions.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName());
+ CaseInsensitiveStringMap writeOptions =
+ new CaseInsensitiveStringMap(
+ ImmutableMap.of(
+ SparkWriteOptions.DISTRIBUTION_MODE,
DistributionMode.NONE.modeName()));
table
.updateProperties()
@@ -402,7 +407,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key",
"session-value"),
() -> {
Table table = validationCatalog.loadTable(tableIdent);
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
Map<String, String> metadata = writeConf.extraSnapshotMetadata();
@@ -416,8 +421,9 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
ImmutableMap.of("spark.sql.iceberg.snapshot-property.test-key",
"session-value"),
() -> {
Table table = validationCatalog.loadTable(tableIdent);
- Map<String, String> writeOptions =
- ImmutableMap.of("snapshot-property.test-key",
"write-option-value");
+ CaseInsensitiveStringMap writeOptions =
+ new CaseInsensitiveStringMap(
+ ImmutableMap.of("snapshot-property.test-key",
"write-option-value"));
SparkWriteConf writeConf = new SparkWriteConf(spark, table,
writeOptions);
Map<String, String> metadata = writeConf.extraSnapshotMetadata();
@@ -596,7 +602,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
public void testDVWriteConf() {
Table table = validationCatalog.loadTable(tableIdent);
table.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit();
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
assertThat(writeConf.deleteFileFormat()).isEqualTo(FileFormat.PUFFIN);
}
@@ -613,7 +619,7 @@ public class TestSparkWriteConf extends TestBaseWithCatalog
{
updateProperties.commit();
- SparkWriteConf writeConf = new SparkWriteConf(spark, table,
ImmutableMap.of());
+ SparkWriteConf writeConf = new SparkWriteConf(spark, table);
Map<String, String> writeProperties = writeConf.writeProperties();
Map<String, String> expectedProperties = propertiesSuite.get(2);
assertThat(writeConf.writeProperties()).hasSameSizeAs(expectedProperties);
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
index 6d965f3dcc..c76c75c9a3 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteDataFilesAction.java
@@ -2105,7 +2105,7 @@ public class TestRewriteDataFilesAction extends TestBase {
RewriteDataFilesSparkAction action =
SparkActions.get(spark).rewriteDataFiles(table);
// The constructor should have set the configuration to false
- SparkReadConf readConf = new SparkReadConf(action.spark(), table,
Collections.emptyMap());
+ SparkReadConf readConf = new SparkReadConf(action.spark(), table);
assertThat(readConf.cacheDeleteFilesOnExecutors())
.as("Executor cache for delete files should be disabled in
RewriteDataFilesSparkAction")
.isFalse();
diff --git
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
index 45268b78f8..1b6334f23e 100644
---
a/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
+++
b/spark/v4.1/spark/src/test/java/org/apache/iceberg/spark/sql/TestPartitionedWritesToWapBranch.java
@@ -25,7 +25,7 @@ import java.util.UUID;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
-import org.apache.iceberg.exceptions.ValidationException;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.spark.SparkSQLProperties;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
@@ -61,15 +61,25 @@ public class TestPartitionedWritesToWapBranch extends
PartitionedWritesTestBase
}
@TestTemplate
- public void testBranchAndWapBranchCannotBothBeSetForWrite() {
+ public void testWriteToBranchWithWapBranchSet() {
Table table = validationCatalog.loadTable(tableIdent);
table.manageSnapshots().createBranch("test2",
table.refs().get(BRANCH).snapshotId()).commit();
sql("REFRESH TABLE " + tableName);
- assertThatThrownBy(() -> sql("INSERT INTO %s.branch_test2 VALUES (4,
'd')", tableName))
- .isInstanceOf(ValidationException.class)
- .hasMessage(
- "Cannot write to both branch and WAP branch, but got branch
[test2] and WAP branch [%s]",
- BRANCH);
+
+ // writing to explicit branch should succeed even with WAP branch set
+ sql("INSERT INTO TABLE %s.branch_test2 VALUES (4, 'd')", tableName);
+
+ // verify write went to branch test2
+ assertEquals(
+ "Data should be written to branch test2",
+ ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c"), row(4L,
"d")),
+ sql("SELECT * FROM %s VERSION AS OF 'test2' ORDER BY id", tableName));
+
+ // verify current state is not affected
+ assertEquals(
+ "Data should be written to branch test2",
+ ImmutableList.of(row(1L, "a"), row(2L, "b"), row(3L, "c")),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
}
@TestTemplate
@@ -77,7 +87,7 @@ public class TestPartitionedWritesToWapBranch extends
PartitionedWritesTestBase
String wapId = UUID.randomUUID().toString();
spark.conf().set(SparkSQLProperties.WAP_ID, wapId);
assertThatThrownBy(() -> sql("INSERT INTO %s VALUES (4, 'd')", tableName))
- .isInstanceOf(ValidationException.class)
+ .isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot set both WAP ID and branch, but got ID [%s] and branch
[%s]", wapId, BRANCH);
}