This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 7fa5fca9b1 Spark 3.4: Handle skew in writes (#7520)
7fa5fca9b1 is described below

commit 7fa5fca9b1121950f03dce6d8b985b283dc256b6
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu May 4 14:25:12 2023 -0700

    Spark 3.4: Handle skew in writes (#7520)
---
 .../iceberg/spark/extensions/TestDelete.java       | 67 +++++++++++++++++++
 .../apache/iceberg/spark/extensions/TestMerge.java | 78 ++++++++++++++++++++++
 .../iceberg/spark/extensions/TestUpdate.java       | 69 +++++++++++++++++++
 .../spark/source/SparkPositionDeltaWrite.java      |  5 ++
 .../apache/iceberg/spark/source/SparkWrite.java    |  5 ++
 5 files changed, 224 insertions(+)

diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
index 6fa8d7f965..336d40cca0 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestDelete.java
@@ -19,10 +19,12 @@
 package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.DELETE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.DELETE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.DELETE_MODE;
 import static org.apache.iceberg.TableProperties.DELETE_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
 import static org.apache.spark.sql.functions.lit;
 
@@ -43,9 +45,11 @@ import java.util.stream.Collectors;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.SnapshotSummary;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.data.GenericRecord;
@@ -68,6 +72,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException;
 import org.apache.spark.sql.catalyst.plans.logical.DeleteFromTableWithFilters;
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan;
 import org.apache.spark.sql.catalyst.plans.logical.RowLevelWrite;
+import org.apache.spark.sql.execution.SparkPlan;
 import 
org.apache.spark.sql.execution.datasources.v2.OptimizeMetadataOnlyDeleteFromTable;
 import org.apache.spark.sql.internal.SQLConf;
 import org.assertj.core.api.Assertions;
@@ -103,6 +108,68 @@ public abstract class TestDelete extends 
SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS parquet_table");
   }
 
+  @Test
+  public void testSkewDelete() throws Exception {
+    createAndInitPartitionedTable();
+
+    Employee[] employees = new Employee[100];
+    for (int index = 0; index < 100; index++) {
+      employees[index] = new Employee(index, "hr");
+    }
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+    append(tableName, employees);
+
+    // set the open file cost large enough to produce a separate scan task per 
file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            DELETE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, 
tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger 
a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("DELETE FROM %s WHERE mod(id, 2) = 0", 
commitTarget());
+          
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW DELETE requests the remaining records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle 
block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 
independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+    } else {
+      // MoR DELETE requests the deleted records to be clustered by `_spec_id` 
and `_partition`
+      // all tasks belong to the same partition and therefore write only 1 
shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same 
reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 
separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+    }
+
+    Assert.assertEquals(
+        "Row count must match", 200L, scalarSql("SELECT COUNT(*) FROM %s", 
commitTarget()));
+  }
+
   @Test
   public void testDeleteWithoutScanningTable() throws Exception {
     createAndInitPartitionedTable();
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
index 4ec78ec385..18d42ca6ae 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
@@ -19,10 +19,12 @@
 package org.apache.iceberg.spark.extensions;
 
 import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE;
+import static org.apache.iceberg.TableProperties.MERGE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.MERGE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.MERGE_MODE;
 import static org.apache.iceberg.TableProperties.MERGE_MODE_DEFAULT;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
 import static org.apache.iceberg.TableProperties.WRITE_DISTRIBUTION_MODE;
 import static org.apache.spark.sql.functions.lit;
@@ -95,6 +97,82 @@ public abstract class TestMerge extends 
SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS source");
   }
 
+  @Test
+  public void testSkewMerge() {
+    createAndInitTable("id INT, salary INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    String[] records = new String[100];
+    for (int index = 0; index < 100; index++) {
+      records[index] = String.format("{ \"id\": %d, \"salary\": 100, \"dep\": 
\"hr\" }", index);
+    }
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+
+    // set the open file cost large enough to produce a separate scan task per 
file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            MERGE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, 
tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    spark.range(0, 100).createOrReplaceTempView("source");
+
+    // enable AQE and set the advisory partition size small enough to trigger 
a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    // set the min coalesce partition size small enough to avoid coalescing
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "4",
+            SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE().key(), "100",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan(
+                  "MERGE INTO %s t USING source "
+                      + "ON t.id = source.id "
+                      + "WHEN MATCHED THEN "
+                      + "  UPDATE SET salary = -1 ",
+                  commitTarget());
+          
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot currentSnapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW MERGE would perform a join on `id` and then cluster records by 
`dep`
+      // the first shuffle distributes records into 4 shuffle partitions so 
that rows can be merged
+      // after existing and new rows are merged, the data is clustered by `dep`
+      // each task with merged data contains records for the same table 
partition
+      // that means there are 4 shuffle blocks, all assigned to the same 
reducer
+      // AQE detects that all shuffle blocks are big and processes them in 4 
independent tasks
+      // otherwise, there would be 1 task processing all 4 shuffle blocks
+      validateProperty(currentSnapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+    } else {
+      // MoR MERGE would perform a join on `id` and then cluster data based on 
the partition
+      // all tasks belong to the same partition and therefore write only 1 
shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same 
reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 
separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      validateProperty(currentSnapshot, 
SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+    }
+
+    Assert.assertEquals(
+        "Row count must match",
+        400L,
+        scalarSql("SELECT COUNT(*) FROM %s WHERE salary = -1", 
commitTarget()));
+  }
+
   @Test
   public void testMergeConditionSplitIntoTargetPredicateAndJoinCondition() {
     createAndInitTable(
diff --git 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
index 776fbb9600..ccfd83c733 100644
--- 
a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
+++ 
b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java
@@ -24,7 +24,9 @@ import static 
org.apache.iceberg.SnapshotSummary.ADDED_FILES_PROP;
 import static org.apache.iceberg.SnapshotSummary.CHANGED_PARTITION_COUNT_PROP;
 import static org.apache.iceberg.SnapshotSummary.DELETED_FILES_PROP;
 import static org.apache.iceberg.TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES;
+import static org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST;
 import static org.apache.iceberg.TableProperties.SPLIT_SIZE;
+import static org.apache.iceberg.TableProperties.UPDATE_DISTRIBUTION_MODE;
 import static org.apache.iceberg.TableProperties.UPDATE_ISOLATION_LEVEL;
 import static org.apache.iceberg.TableProperties.UPDATE_MODE;
 import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
@@ -44,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.iceberg.AppendFiles;
 import org.apache.iceberg.AssertHelpers;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DistributionMode;
 import org.apache.iceberg.RowLevelOperationMode;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotSummary;
@@ -64,6 +67,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.execution.SparkPlan;
 import org.apache.spark.sql.internal.SQLConf;
 import org.assertj.core.api.Assertions;
 import org.junit.After;
@@ -98,6 +102,71 @@ public abstract class TestUpdate extends 
SparkRowLevelOperationsTestBase {
     sql("DROP TABLE IF EXISTS deleted_employee");
   }
 
+  @Test
+  public void testSkewUpdate() {
+    createAndInitTable("id INT, dep STRING");
+    sql("ALTER TABLE %s ADD PARTITION FIELD dep", tableName);
+
+    String[] records = new String[100];
+    for (int index = 0; index < 100; index++) {
+      records[index] = String.format("{ \"id\": %d, \"dep\": \"hr\" }", index);
+    }
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+    append(tableName, records);
+
+    // set the open file cost large enough to produce a separate scan task per 
file
+    // use hash distribution to trigger a shuffle
+    Map<String, String> tableProps =
+        ImmutableMap.of(
+            SPLIT_OPEN_FILE_COST,
+            String.valueOf(Integer.MAX_VALUE),
+            UPDATE_DISTRIBUTION_MODE,
+            DistributionMode.HASH.modeName());
+    sql("ALTER TABLE %s SET TBLPROPERTIES (%s)", tableName, 
tablePropsAsString(tableProps));
+
+    createBranchIfNeeded();
+
+    // enable AQE and set the advisory partition size small enough to trigger 
a split
+    // set the number of shuffle partitions to 2 to only have 2 reducers
+    withSQLConf(
+        ImmutableMap.of(
+            SQLConf.SHUFFLE_PARTITIONS().key(), "2",
+            SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), "true",
+            
SQLConf.ADAPTIVE_OPTIMIZE_SKEWS_IN_REBALANCE_PARTITIONS_ENABLED().key(), "true",
+            SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES().key(), "100"),
+        () -> {
+          SparkPlan plan =
+              executeAndKeepPlan("UPDATE %s SET id = -1 WHERE mod(id, 2) = 0", 
commitTarget());
+          
Assertions.assertThat(plan.toString()).contains("REBALANCE_PARTITIONS_BY_COL");
+        });
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snapshot = SnapshotUtil.latestSnapshot(table, branch);
+
+    if (mode(table) == COPY_ON_WRITE) {
+      // CoW UPDATE requests the updated records to be clustered by `_file`
+      // each task contains only 1 file and therefore writes only 1 shuffle 
block
+      // that means 4 shuffle blocks are distributed among 2 reducers
+      // AQE detects that all shuffle blocks are big and processes them in 4 
independent tasks
+      // otherwise, there would be 2 tasks processing 2 shuffle blocks each
+      validateProperty(snapshot, SnapshotSummary.ADDED_FILES_PROP, "4");
+    } else {
+      // MoR UPDATE requests the deleted records to be clustered by `_spec_id` 
and `_partition`
+      // all tasks belong to the same partition and therefore write only 1 
shuffle block per task
+      // that means there are 4 shuffle blocks, all assigned to the same 
reducer
+      // AQE detects that all 4 shuffle blocks are big and processes them in 4 
separate tasks
+      // otherwise, there would be 1 task processing 4 shuffle blocks
+      validateProperty(snapshot, SnapshotSummary.ADDED_DELETE_FILES_PROP, "4");
+    }
+
+    Assert.assertEquals(
+        "Row count must match",
+        200L,
+        scalarSql("SELECT COUNT(*) FROM %s WHERE id = -1", commitTarget()));
+  }
+
   @Test
   public void testExplain() {
     createAndInitTable("id INT, dep STRING");
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 74d46339ee..416eb6a9ee 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -135,6 +135,11 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;
+  }
+
   @Override
   public SortOrder[] requiredOrdering() {
     return requiredOrdering;
diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index a080fcead1..9e3a15e738 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -137,6 +137,11 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     return requiredDistribution;
   }
 
+  @Override
+  public boolean distributionStrictlyRequired() {
+    return false;
+  }
+
   @Override
   public SortOrder[] requiredOrdering() {
     return requiredOrdering;

Reply via email to