This is an automated email from the ASF dual-hosted git repository.

amogh-jahagirdar pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 57b1211b74 Spark: backport PR #15512 to v3.4, v3.5, v4.0 for WAP 
branch delete fix (#16245)
57b1211b74 is described below

commit 57b1211b7477f9f4a5e79a4cf6f6505d63ded4e8
Author: Steven Zhen Wu <[email protected]>
AuthorDate: Thu May 7 16:42:57 2026 -0700

    Spark: backport PR #15512 to v3.4, v3.5, v4.0 for WAP branch delete fix 
(#16245)
    
    * Spark: backport PR #15512 to v3.4, v3.5, v4.0 for WAP branch delete fix
    
    When WAP is enabled via spark.wap.branch, canDeleteWhere() previously
    scanned the main branch while deleteWhere() committed to the WAP branch.
    This could cause canDeleteWhere() to incorrectly approve a metadata-only
    delete based on data that was never on the WAP branch, surfacing as
    "Cannot delete file where some, but not all, rows match filter" at
    commit time.
    
    Resolve the scan branch the same way deleteWhere resolves the write
    branch (with a fall-back to main when the WAP branch has not been
    created yet), and pass it through canDeleteUsingMetadata.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
    
    * Spark: add blank lines after if blocks in scanBranchForDelete (style)
    
    Iceberg style requires an empty line between a control flow block and
    the following statement.
    
    Co-Authored-By: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../iceberg/spark/extensions/TestDelete.java       | 56 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    | 31 ++++++++++--
 .../iceberg/spark/extensions/TestDelete.java       | 56 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    | 31 ++++++++++--
 .../iceberg/spark/extensions/TestDelete.java       | 56 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    | 31 ++++++++++--
 6 files changed, 246 insertions(+), 15 deletions(-)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 1dd6db48f7..b106e8fc38 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -1395,6 +1395,62 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
         });
   }
 
+  @TestTemplate
+  public void testDeleteToWapBranchCanDeleteWhereScansWapBranch() throws 
NoSuchTableException {
+    assumeThat(branch).as("WAP branch only works for table identifier without 
branch").isNull();
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+
+    append(tableName, new Employee(1, "hr"));
+
+    spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
+    try {
+      append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+
+      sql("DELETE FROM %s WHERE id = 1", tableName);
+
+      assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", 
tableName))
+          .as("DELETE should remove the matching rows from the WAP branch")
+          .containsExactly(row(0, "hr"), row(2, "hr"));
+      assertThat(sql("SELECT id, dep FROM %s.branch_main", tableName))
+          .as("Main branch must not be modified by a WAP-targeted DELETE")
+          .containsExactly(row(1, "hr"));
+    } finally {
+      spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
+    }
+  }
+
+  @TestTemplate
+  public void testMetadataDeleteToWapBranchCommitsToWapBranch() throws 
NoSuchTableException {
+    assumeThat(branch).as("WAP branch only works for table identifier without 
branch").isNull();
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+
+    append(tableName, new Employee(1, "hr"), new Employee(5, "eng"));
+
+    spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
+    try {
+      append(tableName, new Employee(0, "hr"), new Employee(2, "eng"));
+
+      sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+      assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", 
tableName))
+          .as("Metadata delete should remove the hr partition on the WAP 
branch")
+          .containsExactly(row(2, "eng"), row(5, "eng"));
+      assertThat(sql("SELECT id, dep FROM %s.branch_main ORDER BY id", 
tableName))
+          .as("Metadata delete must not commit to main when WAP is set")
+          .containsExactly(row(1, "hr"), row(5, "eng"));
+    } finally {
+      spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
+    }
+  }
+
   @TestTemplate
   public void testDeleteWithFilterOnNestedColumn() {
     createAndInitNestedColumnsTable();
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 871ef93552..1348afff64 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -57,6 +57,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkSQLProperties;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkUtil;
@@ -334,11 +335,31 @@ public class SparkTable
       }
     }
 
-    return canDeleteUsingMetadata(deleteExpr);
+    return canDeleteUsingMetadata(deleteExpr, scanBranchForDelete());
+  }
+
+  // Resolves the branch to scan during canDeleteWhere so it matches the 
branch deleteWhere
+  // will commit to. Falls back to main when WAP is configured but the WAP 
branch does not
+  // exist yet, since this is a read scan.
+  private String scanBranchForDelete() {
+    if (branch != null) {
+      return branch;
+    }
+
+    if (!SparkTableUtil.wapEnabled(table())) {
+      return null;
+    }
+
+    String wapBranch = 
sparkSession().conf().get(SparkSQLProperties.WAP_BRANCH, null);
+    if (wapBranch != null && table().refs().containsKey(wapBranch)) {
+      return wapBranch;
+    }
+
+    return null;
   }
 
   // a metadata delete is possible iff matching files can be deleted entirely
-  private boolean canDeleteUsingMetadata(Expression deleteExpr) {
+  private boolean canDeleteUsingMetadata(Expression deleteExpr, String 
scanBranch) {
     boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
 
     if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
@@ -353,14 +374,14 @@ public class SparkTable
             .includeColumnStats()
             .ignoreResiduals();
 
-    if (branch != null) {
-      scan = scan.useRef(branch);
+    if (scanBranch != null) {
+      scan = scan.useRef(scanBranch);
     }
 
     try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
       Map<Integer, Evaluator> evaluators = Maps.newHashMap();
       StrictMetricsEvaluator metricsEvaluator =
-          new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), branch), 
deleteExpr);
+          new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), 
scanBranch), deleteExpr);
 
       return Iterables.all(
           tasks,
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fbf6ce3559..79d6bea12f 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -1422,6 +1422,62 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
         });
   }
 
+  @TestTemplate
+  public void testDeleteToWapBranchCanDeleteWhereScansWapBranch() throws 
NoSuchTableException {
+    assumeThat(branch).as("WAP branch only works for table identifier without 
branch").isNull();
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+
+    append(tableName, new Employee(1, "hr"));
+
+    spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
+    try {
+      append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+
+      sql("DELETE FROM %s WHERE id = 1", tableName);
+
+      assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", 
tableName))
+          .as("DELETE should remove the matching rows from the WAP branch")
+          .containsExactly(row(0, "hr"), row(2, "hr"));
+      assertThat(sql("SELECT id, dep FROM %s.branch_main", tableName))
+          .as("Main branch must not be modified by a WAP-targeted DELETE")
+          .containsExactly(row(1, "hr"));
+    } finally {
+      spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
+    }
+  }
+
+  @TestTemplate
+  public void testMetadataDeleteToWapBranchCommitsToWapBranch() throws 
NoSuchTableException {
+    assumeThat(branch).as("WAP branch only works for table identifier without 
branch").isNull();
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+
+    append(tableName, new Employee(1, "hr"), new Employee(5, "eng"));
+
+    spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
+    try {
+      append(tableName, new Employee(0, "hr"), new Employee(2, "eng"));
+
+      sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+      assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", 
tableName))
+          .as("Metadata delete should remove the hr partition on the WAP 
branch")
+          .containsExactly(row(2, "eng"), row(5, "eng"));
+      assertThat(sql("SELECT id, dep FROM %s.branch_main ORDER BY id", 
tableName))
+          .as("Metadata delete must not commit to main when WAP is set")
+          .containsExactly(row(1, "hr"), row(5, "eng"));
+    } finally {
+      spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
+    }
+  }
+
   @TestTemplate
   public void testDeleteWithFilterOnNestedColumn() {
     createAndInitNestedColumnsTable();
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 871ef93552..1348afff64 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -57,6 +57,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkSQLProperties;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkUtil;
@@ -334,11 +335,31 @@ public class SparkTable
       }
     }
 
-    return canDeleteUsingMetadata(deleteExpr);
+    return canDeleteUsingMetadata(deleteExpr, scanBranchForDelete());
+  }
+
+  // Resolves the branch to scan during canDeleteWhere so it matches the 
branch deleteWhere
+  // will commit to. Falls back to main when WAP is configured but the WAP 
branch does not
+  // exist yet, since this is a read scan.
+  private String scanBranchForDelete() {
+    if (branch != null) {
+      return branch;
+    }
+
+    if (!SparkTableUtil.wapEnabled(table())) {
+      return null;
+    }
+
+    String wapBranch = 
sparkSession().conf().get(SparkSQLProperties.WAP_BRANCH, null);
+    if (wapBranch != null && table().refs().containsKey(wapBranch)) {
+      return wapBranch;
+    }
+
+    return null;
   }
 
   // a metadata delete is possible iff matching files can be deleted entirely
-  private boolean canDeleteUsingMetadata(Expression deleteExpr) {
+  private boolean canDeleteUsingMetadata(Expression deleteExpr, String 
scanBranch) {
     boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
 
     if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
@@ -353,14 +374,14 @@ public class SparkTable
             .includeColumnStats()
             .ignoreResiduals();
 
-    if (branch != null) {
-      scan = scan.useRef(branch);
+    if (scanBranch != null) {
+      scan = scan.useRef(scanBranch);
     }
 
     try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
       Map<Integer, Evaluator> evaluators = Maps.newHashMap();
       StrictMetricsEvaluator metricsEvaluator =
-          new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), branch), 
deleteExpr);
+          new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), 
scanBranch), deleteExpr);
 
       return Iterables.all(
           tasks,
diff --git 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index fbf6ce3559..79d6bea12f 100644
--- 
a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -1422,6 +1422,62 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
         });
   }
 
+  @TestTemplate
+  public void testDeleteToWapBranchCanDeleteWhereScansWapBranch() throws 
NoSuchTableException {
+    assumeThat(branch).as("WAP branch only works for table identifier without 
branch").isNull();
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+
+    append(tableName, new Employee(1, "hr"));
+
+    spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
+    try {
+      append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+
+      sql("DELETE FROM %s WHERE id = 1", tableName);
+
+      assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", 
tableName))
+          .as("DELETE should remove the matching rows from the WAP branch")
+          .containsExactly(row(0, "hr"), row(2, "hr"));
+      assertThat(sql("SELECT id, dep FROM %s.branch_main", tableName))
+          .as("Main branch must not be modified by a WAP-targeted DELETE")
+          .containsExactly(row(1, "hr"));
+    } finally {
+      spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
+    }
+  }
+
+  @TestTemplate
+  public void testMetadataDeleteToWapBranchCommitsToWapBranch() throws 
NoSuchTableException {
+    assumeThat(branch).as("WAP branch only works for table identifier without 
branch").isNull();
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+
+    append(tableName, new Employee(1, "hr"), new Employee(5, "eng"));
+
+    spark.conf().set(SparkSQLProperties.WAP_BRANCH, "wap");
+    try {
+      append(tableName, new Employee(0, "hr"), new Employee(2, "eng"));
+
+      sql("DELETE FROM %s WHERE dep = 'hr'", tableName);
+
+      assertThat(sql("SELECT id, dep FROM %s.branch_wap ORDER BY id", 
tableName))
+          .as("Metadata delete should remove the hr partition on the WAP 
branch")
+          .containsExactly(row(2, "eng"), row(5, "eng"));
+      assertThat(sql("SELECT id, dep FROM %s.branch_main ORDER BY id", 
tableName))
+          .as("Metadata delete must not commit to main when WAP is set")
+          .containsExactly(row(1, "hr"), row(5, "eng"));
+    } finally {
+      spark.conf().unset(SparkSQLProperties.WAP_BRANCH);
+    }
+  }
+
   @TestTemplate
   public void testDeleteWithFilterOnNestedColumn() {
     createAndInitNestedColumnsTable();
diff --git 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 6f0f992f1c..9e3c9a7e69 100644
--- 
a/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v4.0/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -58,6 +58,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.spark.CommitMetadata;
 import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkSQLProperties;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkUtil;
@@ -376,11 +377,31 @@ public class SparkTable
       }
     }
 
-    return canDeleteUsingMetadata(deleteExpr);
+    return canDeleteUsingMetadata(deleteExpr, scanBranchForDelete());
+  }
+
+  // Resolves the branch to scan during canDeleteWhere so it matches the 
branch deleteWhere
+  // will commit to. Falls back to main when WAP is configured but the WAP 
branch does not
+  // exist yet, since this is a read scan.
+  private String scanBranchForDelete() {
+    if (branch != null) {
+      return branch;
+    }
+
+    if (!SparkTableUtil.wapEnabled(table())) {
+      return null;
+    }
+
+    String wapBranch = 
sparkSession().conf().get(SparkSQLProperties.WAP_BRANCH, null);
+    if (wapBranch != null && table().refs().containsKey(wapBranch)) {
+      return wapBranch;
+    }
+
+    return null;
   }
 
   // a metadata delete is possible iff matching files can be deleted entirely
-  private boolean canDeleteUsingMetadata(Expression deleteExpr) {
+  private boolean canDeleteUsingMetadata(Expression deleteExpr, String 
scanBranch) {
     boolean caseSensitive = SparkUtil.caseSensitive(sparkSession());
 
     if (ExpressionUtil.selectsPartitions(deleteExpr, table(), caseSensitive)) {
@@ -395,14 +416,14 @@ public class SparkTable
             .includeColumnStats()
             .ignoreResiduals();
 
-    if (branch != null) {
-      scan = scan.useRef(branch);
+    if (scanBranch != null) {
+      scan = scan.useRef(scanBranch);
     }
 
     try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
       Map<Integer, Evaluator> evaluators = Maps.newHashMap();
       StrictMetricsEvaluator metricsEvaluator =
-          new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), branch), 
deleteExpr);
+          new StrictMetricsEvaluator(SnapshotUtil.schemaFor(table(), 
scanBranch), deleteExpr);
 
       return Iterables.all(
           tasks,

Reply via email to