This is an automated email from the ASF dual-hosted git repository.
russellspitzer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 32e9f40468 Spark 3.5: Don't change table distribution when only
altering local order (#10774)
32e9f40468 is described below
commit 32e9f40468756a60d2cc52e2b9e951209268e94b
Author: Manu Zhang <[email protected]>
AuthorDate: Sat Oct 26 03:28:51 2024 +0800
Spark 3.5: Don't change table distribution when only altering local order
(#10774)
---
.../IcebergSqlExtensionsAstBuilder.scala | 10 +++++---
.../v2/SetWriteDistributionAndOrderingExec.scala | 10 +++++---
.../TestSetWriteDistributionAndOrdering.java | 29 ++++++++++++++++++++--
.../logical/SetWriteDistributionAndOrdering.scala | 2 +-
4 files changed, 40 insertions(+), 11 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
index 2e438de2b8..6b1cc41da0 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSqlExtensionsAstBuilder.scala
@@ -226,11 +226,13 @@ class IcebergSqlExtensionsAstBuilder(delegate:
ParserInterface) extends IcebergS
}
val distributionMode = if (distributionSpec != null) {
- DistributionMode.HASH
- } else if (orderingSpec.UNORDERED != null || orderingSpec.LOCALLY != null)
{
- DistributionMode.NONE
+ Some(DistributionMode.HASH)
+ } else if (orderingSpec.UNORDERED != null) {
+ Some(DistributionMode.NONE)
+ } else if (orderingSpec.LOCALLY() != null) {
+ None
} else {
- DistributionMode.RANGE
+ Some(DistributionMode.RANGE)
}
val ordering = if (orderingSpec != null && orderingSpec.order != null) {
diff --git
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
index feecc02350..c9004ddc5b 100644
---
a/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
+++
b/spark/v3.5/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetWriteDistributionAndOrderingExec.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.connector.catalog.TableCatalog
case class SetWriteDistributionAndOrderingExec(
catalog: TableCatalog,
ident: Identifier,
- distributionMode: DistributionMode,
+ distributionMode: Option[DistributionMode],
sortOrder: Seq[(Term, SortDirection, NullOrder)]) extends
LeafV2CommandExec {
import CatalogV2Implicits._
@@ -56,9 +56,11 @@ case class SetWriteDistributionAndOrderingExec(
}
orderBuilder.commit()
- txn.updateProperties()
- .set(WRITE_DISTRIBUTION_MODE, distributionMode.modeName())
- .commit()
+ distributionMode.foreach { mode =>
+ txn.updateProperties()
+ .set(WRITE_DISTRIBUTION_MODE, mode.modeName())
+ .commit()
+ }
txn.commitTransaction()
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
index 77b7797fe1..b8547772da 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestSetWriteDistributionAndOrdering.java
@@ -200,8 +200,7 @@ public class TestSetWriteDistributionAndOrdering extends
ExtensionsTestBase {
table.refresh();
- String distributionMode =
table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE);
- assertThat(distributionMode).as("Distribution mode must
match").isEqualTo("none");
+
assertThat(table.properties().containsKey(TableProperties.WRITE_DISTRIBUTION_MODE)).isFalse();
SortOrder expected =
SortOrder.builderFor(table.schema())
@@ -213,6 +212,25 @@ public class TestSetWriteDistributionAndOrdering extends
ExtensionsTestBase {
assertThat(table.sortOrder()).as("Sort order must
match").isEqualTo(expected);
}
+ @TestTemplate
+ public void testSetWriteLocallyOrderedToPartitionedTable() {
+ sql(
+ "CREATE TABLE %s (id bigint NOT NULL, category string) USING iceberg
PARTITIONED BY (id)",
+ tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ assertThat(table.sortOrder().isUnsorted()).as("Table should start
unsorted").isTrue();
+
+ sql("ALTER TABLE %s WRITE LOCALLY ORDERED BY category DESC", tableName);
+
+ table.refresh();
+
+
assertThat(table.properties().containsKey(TableProperties.WRITE_DISTRIBUTION_MODE)).isFalse();
+
+ SortOrder expected =
+
SortOrder.builderFor(table.schema()).withOrderId(1).desc("category").build();
+ assertThat(table.sortOrder()).as("Sort order must
match").isEqualTo(expected);
+ }
+
@TestTemplate
public void testSetWriteDistributedByWithSort() {
sql(
@@ -249,6 +267,13 @@ public class TestSetWriteDistributionAndOrdering extends
ExtensionsTestBase {
SortOrder expected =
SortOrder.builderFor(table.schema()).withOrderId(1).asc("id").build();
assertThat(table.sortOrder()).as("Sort order must
match").isEqualTo(expected);
+
+ sql("ALTER TABLE %s WRITE LOCALLY ORDERED BY id", tableName);
+
+ table.refresh();
+
+ String newDistributionMode =
table.properties().get(TableProperties.WRITE_DISTRIBUTION_MODE);
+ assertThat(newDistributionMode).as("Distribution mode must
match").isEqualTo(distributionMode);
}
@TestTemplate
diff --git
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
index 0a0234cdfe..7b599eb3da 100644
---
a/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
+++
b/spark/v3.5/spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/SetWriteDistributionAndOrdering.scala
@@ -28,7 +28,7 @@ import
org.apache.spark.sql.connector.catalog.CatalogV2Implicits
case class SetWriteDistributionAndOrdering(
table: Seq[String],
- distributionMode: DistributionMode,
+ distributionMode: Option[DistributionMode],
sortOrder: Seq[(Term, SortDirection, NullOrder)]) extends LeafCommand {
import CatalogV2Implicits._