dramaticlly commented on code in PR #7499:
URL: https://github.com/apache/iceberg/pull/7499#discussion_r1187709725
##########
docs/spark-writes.md:
##########
@@ -339,74 +331,60 @@ USING iceberg
PARTITIONED BY (days(ts), category)
```
-To write data to the sample table, your data needs to be sorted by `days(ts),
category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve
it, like below:
+To write data to the sample table, your data needs to be sorted by `days(ts),
category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would
have required manually sorting, but this
+is no longer the case.
```sql
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort`
to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
- .writeTo("prod.db.sample")
- .append()
```
-You can simply add the original column to the sort condition for the most
partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg
transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
- id bigint,
- data string,
- category string,
- ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType,
16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow
Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to
enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can
be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like
below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
- .writeTo("prod.db.sample")
- .append()
-```
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.
+<p>This mode does not request any shuffles or sort to be performed
automatically by Spark. Because no work is done
Review Comment:
if `<p>` is used to start a new paragraph, do we need `</p>` to end ? I also
cant find it in line 350 for paragraph below
##########
docs/spark-writes.md:
##########
@@ -339,74 +331,60 @@ USING iceberg
PARTITIONED BY (days(ts), category)
```
-To write data to the sample table, your data needs to be sorted by `days(ts),
category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve
it, like below:
+To write data to the sample table, your data needs to be sorted by `days(ts),
category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would
have required manually sorting, but this
+is no longer the case.
```sql
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort`
to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
- .writeTo("prod.db.sample")
- .append()
```
-You can simply add the original column to the sort condition for the most
partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg
transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
- id bigint,
- data string,
- category string,
- ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType,
16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow
Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to
enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can
be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like
below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
- .writeTo("prod.db.sample")
- .append()
-```
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.
+<p>This mode does not request any shuffles or sort to be performed
automatically by Spark. Because no work is done
+automatically by Spark, the data must be *manually* locally or globally sorted
by partition value. To reduce the number
+of files produced during writing, using a global sort is recommended.
+<p>A local sort can be avoided by using the Spark [write
fanout](#write-properties) property but this will cause all
+file handles to remain open until each write task has completed.
+* `hash` - This mode is the new default and requests that Spark uses a
hash-based exchange to shuffle the incoming
+write data before writing. Practically, this means that each row is hashed
based on the row's partition value and then placed
+in a corresponding Spark task based upon that value. Further division and
coalescing of tasks may take place because of
+the [Spark's Adaptive Query planning](#controlling-file-sizes).
+* `range` - This mode requests that Spark perform a range based exchanged to
shuffle the data before writing. This is
+a two stage procedure which is more expensive than the `hash` mode. The first
stage samples the data to be written based
+on the partition and sort columns. The second stage uses the range information
to shuffle the input data into Spark
+tasks. Each task gets an exclusive range of the input data which clusters the
data by partition and also globally sorts.
+While this is more expensive than the hash distribution, the global ordering
can be beneficial for read performance if
+sorted columns are used during queries. This mode is used by default if a
table is created with a
+sort-order. Further division and coalescing of tasks may take place because of
+[Spark's Adaptive Query planning](#controlling-file-sizes).
Review Comment:
when reading from rich markdown diff, I notice that the 3 mode are
concatenated together and it seems hard to read like in
<img width="991" alt="image"
src="https://user-images.githubusercontent.com/5961173/236891432-bb4ffa91-6d05-46a3-89b3-fe7537ce1987.png">
Maybe you want to add a new line before hash and range on line 352 and 356
to render it properly?
##########
docs/spark-writes.md:
##########
@@ -339,74 +331,60 @@ USING iceberg
PARTITIONED BY (days(ts), category)
```
-To write data to the sample table, your data needs to be sorted by `days(ts),
category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve
it, like below:
+To write data to the sample table, your data needs to be sorted by `days(ts),
category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would
have required manually sorting, but this
+is no longer the case.
```sql
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort`
to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
- .writeTo("prod.db.sample")
- .append()
```
-You can simply add the original column to the sort condition for the most
partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg
transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
- id bigint,
- data string,
- category string,
- ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType,
16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow
Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to
enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can
be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like
below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
- .writeTo("prod.db.sample")
- .append()
-```
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.
+<p>This mode does not request any shuffles or sort to be performed
automatically by Spark. Because no work is done
+automatically by Spark, the data must be *manually* locally or globally sorted
by partition value. To reduce the number
Review Comment:
if you intend to italicize the word manually here using markdown syntax
`*manually*`, I think it might not work as intended within `<p>`. I think below
will work. Please ignore me if you want literal asterisk
```markdown
<p>This mode does not request any shuffles or sort to be performed
automatically by Spark. Because no work is done
automatically by Spark, the data must be <em>manually</em> locally or
globally sorted by partition value. To reduce the number
of files produced during writing, using a global sort is recommended.
```
##########
docs/spark-writes.md:
##########
@@ -339,74 +331,60 @@ USING iceberg
PARTITIONED BY (days(ts), category)
```
-To write data to the sample table, your data needs to be sorted by `days(ts),
category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve
it, like below:
+To write data to the sample table, your data needs to be sorted by `days(ts),
category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would
have required manually sorting, but this
+is no longer the case.
```sql
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort`
to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
- .writeTo("prod.db.sample")
- .append()
```
-You can simply add the original column to the sort condition for the most
partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg
transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
- id bigint,
- data string,
- category string,
- ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType,
16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow
Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to
enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can
be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like
below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
- .writeTo("prod.db.sample")
- .append()
-```
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.
+<p>This mode does not request any shuffles or sort to be performed
automatically by Spark. Because no work is done
+automatically by Spark, the data must be *manually* locally or globally sorted
by partition value. To reduce the number
Review Comment:
also nit, the `the data must be manually locally or globally sorted by
partition value` seems a bit weird to read.
Maybe
```md
The data must be manually sorted by partition value. Sorting can be done
locally or globally to reduce the number of files produced during writing and
global sort is recommended
```
##########
docs/spark-writes.md:
##########
@@ -339,74 +331,60 @@ USING iceberg
PARTITIONED BY (days(ts), category)
```
-To write data to the sample table, your data needs to be sorted by `days(ts),
category`.
-
-If you're inserting data with SQL statement, you can use `ORDER BY` to achieve
it, like below:
+To write data to the sample table, your data needs to be sorted by `days(ts),
category` but this is taken care
+of automatically by the default `hash` distribution. Previously this would
have required manually sorting, but this
+is no longer the case.
```sql
INSERT INTO prod.db.sample
SELECT id, data, category, ts FROM another_table
-ORDER BY ts, category
-```
-
-If you're inserting data with DataFrame, you can use either `orderBy`/`sort`
to trigger global sort, or `sortWithinPartitions`
-to trigger local sort. Local sort for example:
-
-```scala
-data.sortWithinPartitions("ts", "category")
- .writeTo("prod.db.sample")
- .append()
```
-You can simply add the original column to the sort condition for the most
partition transformations, except `bucket`.
-
-For `bucket` partition transformation, you need to register the Iceberg
transform function in Spark to specify it during sort.
-
-Let's go through another sample table having bucket partition:
-
-```sql
-CREATE TABLE prod.db.sample (
- id bigint,
- data string,
- category string,
- ts timestamp)
-USING iceberg
-PARTITIONED BY (bucket(16, id))
-```
-
-You need to register the function to deal with bucket, like below:
-
-```scala
-import org.apache.iceberg.spark.IcebergSpark
-import org.apache.spark.sql.types.DataTypes
-
-IcebergSpark.registerBucketUDF(spark, "iceberg_bucket16", DataTypes.LongType,
16)
-```
-
-{{< hint info >}}
-Explicit registration of the function is necessary because Spark doesn't allow
Iceberg to provide functions.
-[SPARK-27658](https://issues.apache.org/jira/browse/SPARK-27658) is filed to
enable Iceberg to provide functions
-which can be used in query.
-{{< /hint >}}
-
-Here we just registered the bucket function as `iceberg_bucket16`, which can
be used in sort clause.
-
-If you're inserting data with SQL statement, you can use the function like
below:
-
-```sql
-INSERT INTO prod.db.sample
-SELECT id, data, category, ts FROM another_table
-ORDER BY iceberg_bucket16(id)
-```
-
-If you're inserting data with DataFrame, you can use the function like below:
-
-```scala
-data.sortWithinPartitions(expr("iceberg_bucket16(id)"))
- .writeTo("prod.db.sample")
- .append()
-```
+There are 3 options for `write.distribution-mode`
+
+* `none` - This is the previous default for Iceberg.
+<p>This mode does not request any shuffles or sort to be performed
automatically by Spark. Because no work is done
+automatically by Spark, the data must be *manually* locally or globally sorted
by partition value. To reduce the number
+of files produced during writing, using a global sort is recommended.
+<p>A local sort can be avoided by using the Spark [write
fanout](#write-properties) property but this will cause all
Review Comment:
the hyperlink for `[write fanout](#write-properties)` also does not seem to
work in `<p>` , might need HTML syntax instead like `<a href="url">link
text</a>`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]