This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 057f887713 Docs, Spark: Distribution mode not respected for CTAS/RTAS 
before 3.5.0 (#9439)
057f887713 is described below

commit 057f8877136276b3a0c8c01551f733e804fc42a3
Author: Manu Zhang <[email protected]>
AuthorDate: Fri Jan 19 00:40:34 2024 +0800

    Docs, Spark: Distribution mode not respected for CTAS/RTAS before 3.5.0 
(#9439)
---
 docs/spark-writes.md                               |  5 ++--
 .../iceberg/spark/sql/TestCreateTableAsSelect.java | 16 ++++++++++++-
 .../iceberg/spark/sql/TestCreateTableAsSelect.java | 27 +++++++++++++++++++++-
 3 files changed, 44 insertions(+), 4 deletions(-)

diff --git a/docs/spark-writes.md b/docs/spark-writes.md
index 7338af0c88..b943567460 100644
--- a/docs/spark-writes.md
+++ b/docs/spark-writes.md
@@ -343,7 +343,8 @@ 
data.writeTo("prod.db.sample").option("mergeSchema","true").append()
 Iceberg's default Spark writers require that the data in each spark task is 
clustered by partition values. This 
 distribution is required to minimize the number of file handles that are held 
open while writing. By default, starting
 in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written 
to fit this distribution. The
-request to Spark is done through the table property `write.distribution-mode` 
with the value `hash`.
+request to Spark is done through the table property `write.distribution-mode` 
with the value `hash`. Spark doesn't respect
+distribution mode in CTAS/RTAS before 3.5.0.
 
 Let's go through writing the data against below sample table:
 
@@ -380,7 +381,7 @@ write data before writing.
 Practically, this means that each row is hashed based on the row's partition 
value and then placed
 in a corresponding Spark task based upon that value. Further division and 
coalescing of tasks may take place because of
 [Spark's Adaptive Query planning](#controlling-file-sizes).
-* `range` - This mode requests that Spark perform a range based exchanged to 
shuffle the data before writing.  
+* `range` - This mode requests that Spark perform a range based exchange to 
shuffle the data before writing.  
 This is a two stage procedure which is more expensive than the `hash` mode. 
The first stage samples the data to 
 be written based on the partition and sort columns. The second stage uses the 
range information to shuffle the input data into Spark 
 tasks. Each task gets an exclusive range of the input data which clusters the 
data by partition and also globally sorts.  
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
index 1c08b1fd5a..74906241fa 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.spark.SparkCatalogTestBase;
 import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
+import org.assertj.core.api.Assertions;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -46,7 +48,7 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
             + "USING iceberg PARTITIONED BY (truncate(id, 3))",
         sourceName);
-    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 
'e')", sourceName);
   }
 
   @After
@@ -102,6 +104,18 @@ public class TestCreateTableAsSelect extends 
SparkCatalogTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
 
+  @Test
+  public void testCTASWriteDistributionModeNotRespected() {
+    Assertions.assertThatThrownBy(
+            () ->
+                sql(
+                    "CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, 
id)) AS SELECT * FROM %s",
+                    tableName, sourceName))
+        .isInstanceOf(SparkException.class)
+        .hasMessageContaining(
+            "Incoming records violate the writer assumption that records are 
clustered by spec and by partition within each spec");
+  }
+
   @Test
   public void testRTAS() {
     sql(
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
index 81b1939423..4098a155be 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -70,7 +70,7 @@ public class TestCreateTableAsSelect extends CatalogTestBase {
         "CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
             + "USING iceberg PARTITIONED BY (truncate(id, 3))",
         sourceName);
-    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5, 
'e')", sourceName);
   }
 
   @AfterEach
@@ -125,6 +125,31 @@ public class TestCreateTableAsSelect extends 
CatalogTestBase {
         sql("SELECT * FROM %s ORDER BY id", tableName));
   }
 
+  @TestTemplate
+  public void testCTASWriteDistributionModeRespected() {
+    sql(
+        "CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS 
SELECT * FROM %s",
+        tableName, sourceName);
+
+    Schema expectedSchema =
+        new Schema(
+            Types.NestedField.optional(1, "id", Types.LongType.get()),
+            Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+    PartitionSpec expectedSpec = 
PartitionSpec.builderFor(expectedSchema).bucket("id", 2).build();
+
+    Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+    assertThat(ctasTable.schema().asStruct())
+        .as("Should have expected nullable schema")
+        .isEqualTo(expectedSchema.asStruct());
+    assertThat(ctasTable.spec()).as("Should be partitioned by 
id").isEqualTo(expectedSpec);
+    assertEquals(
+        "Should have rows matching the source table",
+        sql("SELECT * FROM %s ORDER BY id", sourceName),
+        sql("SELECT * FROM %s ORDER BY id", tableName));
+  }
+
   @TestTemplate
   public void testRTAS() {
     sql(

Reply via email to