This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/1.3.x by this push:
     new 57bfde48cc Spark 3.3: Backport 'WAP branch not propagated when using 
DELETE without WHERE' (#8033) (#8036)
57bfde48cc is described below

commit 57bfde48ccb2735139c1fc2637f5f6710365beb6
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jul 11 19:30:22 2023 +0200

    Spark 3.3: Backport 'WAP branch not propagated when using DELETE without 
WHERE' (#8033) (#8036)
---
 .../iceberg/spark/extensions/TestDelete.java       | 38 ++++++++++++++++++++++
 .../org/apache/iceberg/spark/SparkTableUtil.java   | 38 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |  5 +++
 3 files changed, 81 insertions(+)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 463bf2a27b..4a20302989 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -25,6 +25,7 @@ import static 
org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
 import static org.apache.spark.sql.functions.lit;
+import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -46,6 +47,7 @@ import org.apache.iceberg.DataFile;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericRecord;
@@ -1135,6 +1137,42 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
                         branch)));
   }
 
+  @Test
+  public void testDeleteToCustomWapBranchWithoutWhereClause() throws 
NoSuchTableException {
+    assumeThat(branch)
+        .as("Run only if custom WAP branch is not main")
+        .isNotNull()
+        .isNotEqualTo(SnapshotRef.MAIN_BRANCH);
+
+    createAndInitPartitionedTable();
+    sql(
+        "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+        tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+    append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
+    createBranchIfNeeded();
+
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
+        () -> {
+          sql("DELETE FROM %s t WHERE id=1", tableName);
+          Assertions.assertThat(spark.table(tableName).count()).isEqualTo(2L);
+          Assertions.assertThat(spark.table(tableName + ".branch_" + 
branch).count()).isEqualTo(2L);
+          Assertions.assertThat(spark.table(tableName + 
".branch_main").count())
+              .as("Should not modify main branch")
+              .isEqualTo(3L);
+        });
+    withSQLConf(
+        ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
+        () -> {
+          sql("DELETE FROM %s t", tableName);
+          Assertions.assertThat(spark.table(tableName).count()).isEqualTo(0L);
+          Assertions.assertThat(spark.table(tableName + ".branch_" + 
branch).count()).isEqualTo(0L);
+          Assertions.assertThat(spark.table(tableName + 
".branch_main").count())
+              .as("Should not modify main branch")
+              .isEqualTo(3L);
+        });
+  }
+
   // TODO: multiple stripes for ORC
 
   protected void createAndInitPartitionedTable() {
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 90bdbfc1d9..03255e6a7d 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.ValidationException;
 import org.apache.iceberg.hadoop.HadoopFileIO;
 import org.apache.iceberg.hadoop.SerializableConfiguration;
 import org.apache.iceberg.hadoop.Util;
@@ -715,6 +716,43 @@ public class SparkTableUtil {
         spark, DataSourceV2Relation.create(metadataTable, Some.empty(), 
Some.empty(), options));
   }
 
+  /**
+   * Determine the write branch.
+   *
+   * <p>Validate wap config and determine the write branch.
+   *
+   * @param spark a Spark Session
+   * @param branch write branch if there is no WAP branch configured
+   * @return branch for write operation
+   */
+  public static String determineWriteBranch(SparkSession spark, String branch) 
{
+    String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null);
+    String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
+    ValidationException.check(
+        wapId == null || wapBranch == null,
+        "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
+        wapId,
+        wapBranch);
+
+    if (wapBranch != null) {
+      ValidationException.check(
+          branch == null,
+          "Cannot write to both branch and WAP branch, but got branch [%s] and 
WAP branch [%s]",
+          branch,
+          wapBranch);
+
+      return wapBranch;
+    }
+    return branch;
+  }
+
+  public static boolean wapEnabled(Table table) {
+    return PropertyUtil.propertyAsBoolean(
+        table.properties(),
+        TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
+        
Boolean.getBoolean(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+  }
+
   /** Class representing a table partition. */
   public static class SparkPartition implements Serializable {
     private final Map<String, String> values;
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d845284513..60e1eb36fe 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkFilters;
 import org.apache.iceberg.spark.SparkReadOptions;
 import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
 import org.apache.iceberg.spark.SparkUtil;
 import org.apache.iceberg.util.PropertyUtil;
 import org.apache.iceberg.util.SnapshotUtil;
@@ -371,6 +372,10 @@ public class SparkTable
             .set("spark.app.id", sparkSession().sparkContext().applicationId())
             .deleteFromRowFilter(deleteExpr);
 
+    if (SparkTableUtil.wapEnabled(table())) {
+      branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch);
+    }
+
     if (branch != null) {
       deleteFiles.toBranch(branch);
     }

Reply via email to