This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 057f887713 Docs, Spark: Distribution mode not respected for CTAS/RTAS
before 3.5.0 (#9439)
057f887713 is described below
commit 057f8877136276b3a0c8c01551f733e804fc42a3
Author: Manu Zhang <[email protected]>
AuthorDate: Fri Jan 19 00:40:34 2024 +0800
Docs, Spark: Distribution mode not respected for CTAS/RTAS before 3.5.0
(#9439)
---
docs/spark-writes.md | 5 ++--
.../iceberg/spark/sql/TestCreateTableAsSelect.java | 16 ++++++++++++-
.../iceberg/spark/sql/TestCreateTableAsSelect.java | 27 +++++++++++++++++++++-
3 files changed, 44 insertions(+), 4 deletions(-)
diff --git a/docs/spark-writes.md b/docs/spark-writes.md
index 7338af0c88..b943567460 100644
--- a/docs/spark-writes.md
+++ b/docs/spark-writes.md
@@ -343,7 +343,8 @@
data.writeTo("prod.db.sample").option("mergeSchema","true").append()
Iceberg's default Spark writers require that the data in each spark task is
clustered by partition values. This
distribution is required to minimize the number of file handles that are held
open while writing. By default, starting
in Iceberg 1.2.0, Iceberg also requests that Spark pre-sort data to be written
to fit this distribution. The
-request to Spark is done through the table property `write.distribution-mode`
with the value `hash`.
+request to Spark is done through the table property `write.distribution-mode`
with the value `hash`. Spark doesn't respect
+distribution mode in CTAS/RTAS before 3.5.0.
Let's go through writing the data against below sample table:
@@ -380,7 +381,7 @@ write data before writing.
Practically, this means that each row is hashed based on the row's partition
value and then placed
in a corresponding Spark task based upon that value. Further division and
coalescing of tasks may take place because of
[Spark's Adaptive Query planning](#controlling-file-sizes).
-* `range` - This mode requests that Spark perform a range based exchanged to
shuffle the data before writing.
+* `range` - This mode requests that Spark perform a range based exchange to
shuffle the data before writing.
This is a two stage procedure which is more expensive than the `hash` mode.
The first stage samples the data to
be written based on the partition and sort columns. The second stage uses the
range information to shuffle the input data into Spark
tasks. Each task gets an exclusive range of the input data which clusters the
data by partition and also globally sorts.
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
index 1c08b1fd5a..74906241fa 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -29,6 +29,8 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.spark.SparkCatalogTestBase;
import org.apache.iceberg.types.Types;
+import org.apache.spark.SparkException;
+import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -46,7 +48,7 @@ public class TestCreateTableAsSelect extends
SparkCatalogTestBase {
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
+ "USING iceberg PARTITIONED BY (truncate(id, 3))",
sourceName);
- sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5,
'e')", sourceName);
}
@After
@@ -102,6 +104,18 @@ public class TestCreateTableAsSelect extends
SparkCatalogTestBase {
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+ @Test
+ public void testCTASWriteDistributionModeNotRespected() {
+ Assertions.assertThatThrownBy(
+ () ->
+ sql(
+ "CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2,
id)) AS SELECT * FROM %s",
+ tableName, sourceName))
+ .isInstanceOf(SparkException.class)
+ .hasMessageContaining(
+ "Incoming records violate the writer assumption that records are
clustered by spec and by partition within each spec");
+ }
+
@Test
public void testRTAS() {
sql(
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
index 81b1939423..4098a155be 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestCreateTableAsSelect.java
@@ -70,7 +70,7 @@ public class TestCreateTableAsSelect extends CatalogTestBase {
"CREATE TABLE IF NOT EXISTS %s (id bigint NOT NULL, data string) "
+ "USING iceberg PARTITIONED BY (truncate(id, 3))",
sourceName);
- sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c')", sourceName);
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b'), (3, 'c'), (4, 'd'), (5,
'e')", sourceName);
}
@AfterEach
@@ -125,6 +125,31 @@ public class TestCreateTableAsSelect extends
CatalogTestBase {
sql("SELECT * FROM %s ORDER BY id", tableName));
}
+ @TestTemplate
+ public void testCTASWriteDistributionModeRespected() {
+ sql(
+ "CREATE TABLE %s USING iceberg PARTITIONED BY (bucket(2, id)) AS
SELECT * FROM %s",
+ tableName, sourceName);
+
+ Schema expectedSchema =
+ new Schema(
+ Types.NestedField.optional(1, "id", Types.LongType.get()),
+ Types.NestedField.optional(2, "data", Types.StringType.get()));
+
+ PartitionSpec expectedSpec =
PartitionSpec.builderFor(expectedSchema).bucket("id", 2).build();
+
+ Table ctasTable = validationCatalog.loadTable(tableIdent);
+
+ assertThat(ctasTable.schema().asStruct())
+ .as("Should have expected nullable schema")
+ .isEqualTo(expectedSchema.asStruct());
+ assertThat(ctasTable.spec()).as("Should be partitioned by
id").isEqualTo(expectedSpec);
+ assertEquals(
+ "Should have rows matching the source table",
+ sql("SELECT * FROM %s ORDER BY id", sourceName),
+ sql("SELECT * FROM %s ORDER BY id", tableName));
+ }
+
@TestTemplate
public void testRTAS() {
sql(