This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch 1.3.x
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/1.3.x by this push:
new 57bfde48cc Spark 3.3: Backport 'WAP branch not propagated when using
DELETE without WHERE' (#8033) (#8036)
57bfde48cc is described below
commit 57bfde48ccb2735139c1fc2637f5f6710365beb6
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Tue Jul 11 19:30:22 2023 +0200
Spark 3.3: Backport 'WAP branch not propagated when using DELETE without
WHERE' (#8033) (#8036)
---
.../iceberg/spark/extensions/TestDelete.java | 38 ++++++++++++++++++++++
.../org/apache/iceberg/spark/SparkTableUtil.java | 38 ++++++++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 5 +++
3 files changed, 81 insertions(+)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 463bf2a27b..4a20302989 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -25,6 +25,7 @@ import static
org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
import static org.apache.spark.sql.functions.lit;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.util.Arrays;
import java.util.Collections;
@@ -46,6 +47,7 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotRef;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
@@ -1135,6 +1137,42 @@ public abstract class TestDelete extends
SparkRowLevelOperationsTestBase {
branch)));
}
+ @Test
+ public void testDeleteToCustomWapBranchWithoutWhereClause() throws
NoSuchTableException {
+ assumeThat(branch)
+ .as("Run only if custom WAP branch is not main")
+ .isNotNull()
+ .isNotEqualTo(SnapshotRef.MAIN_BRANCH);
+
+ createAndInitPartitionedTable();
+ sql(
+ "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true')",
+ tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
+ append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new
Employee(2, "hr"));
+ createBranchIfNeeded();
+
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
+ () -> {
+ sql("DELETE FROM %s t WHERE id=1", tableName);
+ Assertions.assertThat(spark.table(tableName).count()).isEqualTo(2L);
+ Assertions.assertThat(spark.table(tableName + ".branch_" +
branch).count()).isEqualTo(2L);
+ Assertions.assertThat(spark.table(tableName +
".branch_main").count())
+ .as("Should not modify main branch")
+ .isEqualTo(3L);
+ });
+ withSQLConf(
+ ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
+ () -> {
+ sql("DELETE FROM %s t", tableName);
+ Assertions.assertThat(spark.table(tableName).count()).isEqualTo(0L);
+ Assertions.assertThat(spark.table(tableName + ".branch_" +
branch).count()).isEqualTo(0L);
+ Assertions.assertThat(spark.table(tableName +
".branch_main").count())
+ .as("Should not modify main branch")
+ .isEqualTo(3L);
+ });
+ }
+
// TODO: multiple stripes for ORC
protected void createAndInitPartitionedTable() {
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
index 90bdbfc1d9..03255e6a7d 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/SparkTableUtil.java
@@ -44,6 +44,7 @@ import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.TableMigrationUtil;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.hadoop.SerializableConfiguration;
import org.apache.iceberg.hadoop.Util;
@@ -715,6 +716,43 @@ public class SparkTableUtil {
spark, DataSourceV2Relation.create(metadataTable, Some.empty(),
Some.empty(), options));
}
+ /**
+ * Determine the write branch.
+ *
+ * <p>Validate wap config and determine the write branch.
+ *
+ * @param spark a Spark Session
+ * @param branch write branch if there is no WAP branch configured
+ * @return branch for write operation
+ */
+ public static String determineWriteBranch(SparkSession spark, String branch)
{
+ String wapId = spark.conf().get(SparkSQLProperties.WAP_ID, null);
+ String wapBranch = spark.conf().get(SparkSQLProperties.WAP_BRANCH, null);
+ ValidationException.check(
+ wapId == null || wapBranch == null,
+ "Cannot set both WAP ID and branch, but got ID [%s] and branch [%s]",
+ wapId,
+ wapBranch);
+
+ if (wapBranch != null) {
+ ValidationException.check(
+ branch == null,
+ "Cannot write to both branch and WAP branch, but got branch [%s] and
WAP branch [%s]",
+ branch,
+ wapBranch);
+
+ return wapBranch;
+ }
+ return branch;
+ }
+
+ public static boolean wapEnabled(Table table) {
+ return PropertyUtil.propertyAsBoolean(
+ table.properties(),
+ TableProperties.WRITE_AUDIT_PUBLISH_ENABLED,
+
Boolean.getBoolean(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED_DEFAULT));
+ }
+
/** Class representing a table partition. */
public static class SparkPartition implements Serializable {
private final Map<String, String> values;
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index d845284513..60e1eb36fe 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -56,6 +56,7 @@ import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkFilters;
import org.apache.iceberg.spark.SparkReadOptions;
import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.SparkTableUtil;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
@@ -371,6 +372,10 @@ public class SparkTable
.set("spark.app.id", sparkSession().sparkContext().applicationId())
.deleteFromRowFilter(deleteExpr);
+ if (SparkTableUtil.wapEnabled(table())) {
+ branch = SparkTableUtil.determineWriteBranch(sparkSession(), branch);
+ }
+
if (branch != null) {
deleteFiles.toBranch(branch);
}