This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new b7296a755b Spark 3.5: Use fanout writers for unsorted tables by 
default (#8621)
b7296a755b is described below

commit b7296a755ba9c3e9236a43f99225c580cd9abdc0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Sep 26 16:04:30 2023 -0700

    Spark 3.5: Use fanout writers for unsorted tables by default (#8621)
---
 .../TestRequiredDistributionAndOrdering.java       |   1 +
 .../iceberg/spark/extensions/TestWriteAborts.java  |   1 +
 .../org/apache/iceberg/spark/SparkWriteConf.java   |  13 +-
 .../spark/source/SparkPositionDeltaWrite.java      |  13 +-
 .../apache/iceberg/spark/source/SparkWrite.java    |  14 +-
 .../TestSparkDistributionAndOrderingUtil.java      | 409 +++++----------------
 .../iceberg/spark/source/TestFilteredScan.java     |  59 +--
 .../iceberg/spark/source/TestPartitionValues.java  |   4 +
 .../TestRequiredDistributionAndOrdering.java       |   1 +
 9 files changed, 143 insertions(+), 372 deletions(-)

diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
index fcdf9bf992..809de08379 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
@@ -195,6 +195,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkExtensionsTestBase
             inputDF
                 .writeTo(tableName)
                 .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
+                .option(SparkWriteOptions.FANOUT_ENABLED, "false")
                 .append();
           } catch (NoSuchTableException e) {
             throw new RuntimeException(e);
diff --git 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
index 15484f45f8..4d87099572 100644
--- 
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
+++ 
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
@@ -112,6 +112,7 @@ public class TestWriteAborts extends 
SparkExtensionsTestBase {
                     .sortWithinPartitions("id")
                     .writeTo(tableName)
                     
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+                    .option(SparkWriteOptions.FANOUT_ENABLED, "false")
                     .append())
         .isInstanceOf(SparkException.class)
         .hasMessageContaining("Encountered records that belong to already 
closed files");
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 28a586e3e9..834e839874 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -172,12 +172,21 @@ public class SparkWriteConf {
         .parse();
   }
 
-  public boolean fanoutWriterEnabled() {
+  public boolean useFanoutWriter(SparkWriteRequirements writeRequirements) {
+    boolean defaultValue = !writeRequirements.hasOrdering();
+    return fanoutWriterEnabled(defaultValue);
+  }
+
+  private boolean fanoutWriterEnabled() {
+    return fanoutWriterEnabled(true /* enabled by default */);
+  }
+
+  private boolean fanoutWriterEnabled(boolean defaultValue) {
     return confParser
         .booleanConf()
         .option(SparkWriteOptions.FANOUT_ENABLED)
         .tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED)
-        
.defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)
+        .defaultValue(defaultValue)
         .parse();
   }
 
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 9fea33948b..a8145c2aba 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -406,11 +406,10 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
         Table table, SparkFileWriterFactory writers, OutputFileFactory files, 
Context context) {
 
       FileIO io = table.io();
-      boolean fanoutEnabled = context.fanoutWriterEnabled();
-      boolean inputOrdered = context.inputOrdered();
+      boolean useFanoutWriter = context.useFanoutWriter();
       long targetFileSize = context.targetDataFileSize();
 
-      if (table.spec().isPartitioned() && fanoutEnabled && !inputOrdered) {
+      if (table.spec().isPartitioned() && useFanoutWriter) {
         return new FanoutDataWriter<>(writers, files, io, targetFileSize);
       } else {
         return new ClusteredDataWriter<>(writers, files, io, targetFileSize);
@@ -670,7 +669,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
     private final FileFormat deleteFileFormat;
     private final long targetDeleteFileSize;
     private final String queryId;
-    private final boolean fanoutWriterEnabled;
+    private final boolean useFanoutWriter;
     private final boolean inputOrdered;
 
     Context(
@@ -687,7 +686,7 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
       this.metadataSparkType = info.metadataSchema().get();
       this.queryId = info.queryId();
-      this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled();
+      this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
       this.inputOrdered = writeRequirements.hasOrdering();
     }
 
@@ -723,8 +722,8 @@ class SparkPositionDeltaWrite implements DeltaWrite, 
RequiresDistributionAndOrde
       return queryId;
     }
 
-    boolean fanoutWriterEnabled() {
-      return fanoutWriterEnabled;
+    boolean useFanoutWriter() {
+      return useFanoutWriter;
     }
 
     boolean inputOrdered() {
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 15881098e7..802c789ce8 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -98,7 +98,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
   private final Schema writeSchema;
   private final StructType dsSchema;
   private final Map<String, String> extraSnapshotMetadata;
-  private final boolean partitionedFanoutEnabled;
+  private final boolean useFanoutWriter;
   private final SparkWriteRequirements writeRequirements;
   private final Map<String, String> writeProperties;
 
@@ -126,7 +126,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     this.writeSchema = writeSchema;
     this.dsSchema = dsSchema;
     this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
-    this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+    this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
     this.writeRequirements = writeRequirements;
     this.outputSpecId = writeConf.outputSpecId();
     this.writeProperties = writeConf.writeProperties();
@@ -188,7 +188,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
         targetFileSize,
         writeSchema,
         dsSchema,
-        partitionedFanoutEnabled,
+        useFanoutWriter,
         writeProperties);
   }
 
@@ -617,7 +617,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
     private final long targetFileSize;
     private final Schema writeSchema;
     private final StructType dsSchema;
-    private final boolean partitionedFanoutEnabled;
+    private final boolean useFanoutWriter;
     private final String queryId;
     private final Map<String, String> writeProperties;
 
@@ -629,7 +629,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
         long targetFileSize,
         Schema writeSchema,
         StructType dsSchema,
-        boolean partitionedFanoutEnabled,
+        boolean useFanoutWriter,
         Map<String, String> writeProperties) {
       this.tableBroadcast = tableBroadcast;
       this.format = format;
@@ -637,7 +637,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
       this.targetFileSize = targetFileSize;
       this.writeSchema = writeSchema;
       this.dsSchema = dsSchema;
-      this.partitionedFanoutEnabled = partitionedFanoutEnabled;
+      this.useFanoutWriter = useFanoutWriter;
       this.queryId = queryId;
       this.writeProperties = writeProperties;
     }
@@ -678,7 +678,7 @@ abstract class SparkWrite implements Write, 
RequiresDistributionAndOrdering {
             writeSchema,
             dsSchema,
             targetFileSize,
-            partitionedFanoutEnabled);
+            useFanoutWriter);
       }
     }
   }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index 79374edc3f..7ed34d4016 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -241,6 +241,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -252,23 +254,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
         };
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
expectedOrdering);
-  }
-
-  @Test
-  public void testDefaultWritePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
 
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
-
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
EMPTY_ORDERING);
   }
@@ -285,6 +272,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(WRITE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -296,27 +285,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
         };
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
expectedOrdering);
-  }
 
-  @Test
-  public void testHashWritePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
-
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
EMPTY_ORDERING);
   }
@@ -333,6 +303,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(WRITE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -342,31 +314,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
     Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
expectedOrdering);
-  }
-
-  @Test
-  public void testRangeWritePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
 
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
-
-    SortOrder[] expectedOrdering =
-        new SortOrder[] {
-          Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
-          Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
-        };
-
-    Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
+    enableFanoutWriters(table);
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
EMPTY_ORDERING);
   }
@@ -434,6 +383,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.replaceSortOrder().asc("id").commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -443,29 +394,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
     Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
expectedOrdering);
-  }
-
-  @Test
-  public void testRangeWritePartitionedSortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date)",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table.replaceSortOrder().asc("id").commit();
-
-    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
-
-    SortOrder[] expectedOrdering =
-        new SortOrder[] {
-          Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
-          Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING)
-        };
 
-    Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
+    enableFanoutWriters(table);
 
     checkWriteDistributionAndOrdering(table, expectedDistribution, 
expectedOrdering);
   }
@@ -642,6 +572,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -653,23 +585,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
         };
 
     checkCopyOnWriteDistributionAndOrdering(table, DELETE, 
expectedDistribution, expectedOrdering);
-  }
 
-  @Test
-  public void testDefaultCopyOnWriteDeletePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
-
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, DELETE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -686,6 +603,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -694,23 +613,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     checkCopyOnWriteDistributionAndOrdering(
         table, DELETE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
-  }
-
-  @Test
-  public void testNoneCopyOnWriteDeletePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
 
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(
         table, DELETE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
@@ -728,6 +632,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -739,27 +645,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
         };
 
     checkCopyOnWriteDistributionAndOrdering(table, DELETE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testHashCopyOnWriteDeletePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
 
-    table
-        .updateProperties()
-        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
-
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, DELETE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -776,6 +663,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -785,30 +674,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
     Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
 
     checkCopyOnWriteDistributionAndOrdering(table, DELETE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testRangeCopyOnWriteDeletePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
 
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
-
-    SortOrder[] expectedOrdering =
-        new SortOrder[] {
-          Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
-          Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
-        };
-    Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, DELETE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1086,6 +953,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -1097,23 +966,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
         };
 
     checkCopyOnWriteDistributionAndOrdering(table, UPDATE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
 
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, UPDATE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1130,6 +984,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -1138,23 +994,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     checkCopyOnWriteDistributionAndOrdering(
         table, UPDATE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
-  }
 
-  @Test
-  public void testNoneCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(
         table, UPDATE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
@@ -1172,6 +1013,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -1183,27 +1026,8 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
         };
 
     checkCopyOnWriteDistributionAndOrdering(table, UPDATE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testHashCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
 
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, UPDATE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1220,6 +1044,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -1229,30 +1055,8 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
     Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
 
     checkCopyOnWriteDistributionAndOrdering(table, UPDATE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testRangeCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
 
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
-
-    SortOrder[] expectedOrdering =
-        new SortOrder[] {
-          Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
-          Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
-        };
-    Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, UPDATE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1526,6 +1330,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -1537,23 +1343,8 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
         };
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testDefaultCopyOnWriteMergePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
 
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
-
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1570,6 +1361,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -1578,23 +1371,8 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
 
     checkCopyOnWriteDistributionAndOrdering(
         table, MERGE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
-  }
-
-  @Test
-  public void testNoneCopyOnWriteMergePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
 
-    table
-        .updateProperties()
-        .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
   }
@@ -1611,6 +1389,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
     Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
@@ -1622,27 +1402,8 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
         };
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
expectedDistribution, expectedOrdering);
-  }
 
-  @Test
-  public void testHashCopyOnWriteMergePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
-
-    Expression[] expectedClustering =
-        new Expression[] {Expressions.identity("date"), 
Expressions.days("ts")};
-    Distribution expectedDistribution = 
Distributions.clustered(expectedClustering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1659,6 +1420,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
@@ -1668,30 +1431,8 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
     Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
expectedDistribution, expectedOrdering);
-  }
-
-  @Test
-  public void testRangeCopyOnWriteMergePartitionedUnsortedTableFanout() {
-    sql(
-        "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
-            + "USING iceberg "
-            + "PARTITIONED BY (date, days(ts))",
-        tableName);
-
-    Table table = validationCatalog.loadTable(tableIdent);
-
-    table
-        .updateProperties()
-        .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
-        .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
-        .commit();
 
-    SortOrder[] expectedOrdering =
-        new SortOrder[] {
-          Expressions.sort(Expressions.column("date"), 
SortDirection.ASCENDING),
-          Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
-        };
-    Distribution expectedDistribution = 
Distributions.ordered(expectedOrdering);
+    enableFanoutWriters(table);
 
     checkCopyOnWriteDistributionAndOrdering(table, MERGE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -1838,6 +1579,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table,
         DELETE,
@@ -1858,6 +1601,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table, DELETE, UNSPECIFIED_DISTRIBUTION, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
 
@@ -1875,6 +1620,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table,
         DELETE,
@@ -1895,6 +1642,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     Distribution expectedDistribution = 
Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
 
     checkPositionDeltaDistributionAndOrdering(
@@ -1915,6 +1664,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table,
         DELETE,
@@ -1939,6 +1690,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table, DELETE, UNSPECIFIED_DISTRIBUTION, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
 
@@ -1960,6 +1713,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table,
         DELETE,
@@ -1984,6 +1739,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(DELETE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     Distribution expectedDistribution = 
Distributions.ordered(SPEC_ID_PARTITION_ORDERING);
 
     checkPositionDeltaDistributionAndOrdering(
@@ -2063,6 +1820,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table,
         UPDATE,
@@ -2083,6 +1842,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table, UPDATE, UNSPECIFIED_DISTRIBUTION, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
 
@@ -2100,6 +1861,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table,
         UPDATE,
@@ -2120,12 +1883,14 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     Distribution expectedDistribution = 
Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
 
     checkPositionDeltaDistributionAndOrdering(
         table, UPDATE, expectedDistribution, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
 
-    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
+    enableFanoutWriters(table);
 
     checkPositionDeltaDistributionAndOrdering(table, UPDATE, 
expectedDistribution, EMPTY_ORDERING);
   }
@@ -2263,6 +2028,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {
           Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2306,6 +2073,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(
@@ -2341,6 +2110,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {
           Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2384,6 +2155,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(UPDATE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedDistributionOrdering =
         new SortOrder[] {
           Expressions.sort(
@@ -2647,6 +2420,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {
           Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2671,6 +2446,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     checkPositionDeltaDistributionAndOrdering(
         table, MERGE, UNSPECIFIED_DISTRIBUTION, 
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
 
@@ -2688,6 +2465,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {
           Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2712,6 +2491,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedDistributionOrdering =
         new SortOrder[] {
           Expressions.sort(
@@ -2877,6 +2658,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     Table table = validationCatalog.loadTable(tableIdent);
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {
           Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2919,6 +2702,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_NONE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedOrdering =
         new SortOrder[] {
           Expressions.sort(
@@ -2954,6 +2739,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_HASH).commit();
 
+    disableFanoutWriters(table);
+
     Expression[] expectedClustering =
         new Expression[] {
           Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2996,6 +2783,8 @@ public class TestSparkDistributionAndOrderingUtil extends 
SparkTestBaseWithCatal
 
     table.updateProperties().set(MERGE_DISTRIBUTION_MODE, 
WRITE_DISTRIBUTION_MODE_RANGE).commit();
 
+    disableFanoutWriters(table);
+
     SortOrder[] expectedDistributionOrdering =
         new SortOrder[] {
           Expressions.sort(
@@ -3227,6 +3016,10 @@ public class TestSparkDistributionAndOrderingUtil 
extends SparkTestBaseWithCatal
     Assert.assertArrayEquals("Ordering must match", expectedOrdering, 
ordering);
   }
 
+  private void disableFanoutWriters(Table table) {
+    table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"false").commit();
+  }
+
   private void enableFanoutWriters(Table table) {
     table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, 
"true").commit();
   }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index e8af5e51ec..0efec160e8 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -21,18 +21,13 @@ package org.apache.iceberg.spark.source;
 import static org.apache.iceberg.Files.localOutput;
 import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
 import static org.apache.iceberg.PlanningMode.LOCAL;
-import static 
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp;
-import static org.apache.spark.sql.functions.callUDF;
-import static org.apache.spark.sql.functions.column;
 
 import java.io.File;
 import java.io.IOException;
-import java.sql.Timestamp;
 import java.time.OffsetDateTime;
 import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
-import java.util.function.Function;
 import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
@@ -52,14 +47,13 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
 import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
 import org.apache.iceberg.spark.data.GenericsHelpers;
-import org.apache.iceberg.transforms.Transforms;
 import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.api.java.UDF1;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.connector.expressions.filter.Predicate;
 import org.apache.spark.sql.connector.read.Batch;
@@ -73,9 +67,6 @@ import org.apache.spark.sql.sources.GreaterThan;
 import org.apache.spark.sql.sources.LessThan;
 import org.apache.spark.sql.sources.Not;
 import org.apache.spark.sql.sources.StringStartsWith;
-import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.LongType$;
-import org.apache.spark.sql.types.StringType$;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
 import org.assertj.core.api.Assertions;
 import org.junit.AfterClass;
@@ -119,29 +110,6 @@ public class TestFilteredScan {
   @BeforeClass
   public static void startSpark() {
     TestFilteredScan.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
-
-    // define UDFs used by partition tests
-    Function<Object, Integer> bucket4 = 
Transforms.bucket(4).bind(Types.LongType.get());
-    spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply, 
IntegerType$.MODULE$);
-
-    Function<Object, Integer> day = 
Transforms.day().bind(Types.TimestampType.withZone());
-    spark
-        .udf()
-        .register(
-            "ts_day",
-            (UDF1<Timestamp, Integer>) timestamp -> 
day.apply(fromJavaTimestamp(timestamp)),
-            IntegerType$.MODULE$);
-
-    Function<Object, Integer> hour = 
Transforms.hour().bind(Types.TimestampType.withZone());
-    spark
-        .udf()
-        .register(
-            "ts_hour",
-            (UDF1<Timestamp, Integer>) timestamp -> 
hour.apply(fromJavaTimestamp(timestamp)),
-            IntegerType$.MODULE$);
-
-    spark.udf().register("data_ident", (UDF1<String, String>) data -> data, 
StringType$.MODULE$);
-    spark.udf().register("id_ident", (UDF1<Long, Long>) id -> id, 
LongType$.MODULE$);
   }
 
   @AfterClass
@@ -299,7 +267,7 @@ public class TestFilteredScan {
 
   @Test
   public void testBucketPartitionedIDFilters() {
-    Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID, 
"bucket4", "id");
+    Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID);
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
 
@@ -329,7 +297,7 @@ public class TestFilteredScan {
   @SuppressWarnings("checkstyle:AvoidNestedBlocks")
   @Test
   public void testDayPartitionedTimestampFilters() {
-    Table table = buildPartitionedTable("partitioned_by_day", 
PARTITION_BY_DAY, "ts_day", "ts");
+    Table table = buildPartitionedTable("partitioned_by_day", 
PARTITION_BY_DAY);
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
     Batch unfiltered =
@@ -383,7 +351,7 @@ public class TestFilteredScan {
   @SuppressWarnings("checkstyle:AvoidNestedBlocks")
   @Test
   public void testHourPartitionedTimestampFilters() {
-    Table table = buildPartitionedTable("partitioned_by_hour", 
PARTITION_BY_HOUR, "ts_hour", "ts");
+    Table table = buildPartitionedTable("partitioned_by_hour", 
PARTITION_BY_HOUR);
 
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
@@ -479,8 +447,7 @@ public class TestFilteredScan {
 
   @Test
   public void testPartitionedByDataStartsWithFilter() {
-    Table table =
-        buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, 
"data_ident", "data");
+    Table table = buildPartitionedTable("partitioned_by_data", 
PARTITION_BY_DATA);
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
 
@@ -495,8 +462,7 @@ public class TestFilteredScan {
 
   @Test
   public void testPartitionedByDataNotStartsWithFilter() {
-    Table table =
-        buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA, 
"data_ident", "data");
+    Table table = buildPartitionedTable("partitioned_by_data", 
PARTITION_BY_DATA);
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
 
@@ -511,7 +477,7 @@ public class TestFilteredScan {
 
   @Test
   public void testPartitionedByIdStartsWith() {
-    Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID, 
"id_ident", "id");
+    Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID);
 
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
@@ -527,7 +493,7 @@ public class TestFilteredScan {
 
   @Test
   public void testPartitionedByIdNotStartsWith() {
-    Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID, 
"id_ident", "id");
+    Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID);
 
     CaseInsensitiveStringMap options =
         new CaseInsensitiveStringMap(ImmutableMap.of("path", 
table.location()));
@@ -623,8 +589,7 @@ public class TestFilteredScan {
     
filterable.pushPredicates(Arrays.stream(filters).map(Filter::toV2).toArray(Predicate[]::new));
   }
 
-  private Table buildPartitionedTable(
-      String desc, PartitionSpec spec, String udf, String partitionColumn) {
+  private Table buildPartitionedTable(String desc, PartitionSpec spec) {
     File location = new File(parent, desc);
     Table table = TABLES.create(SCHEMA, spec, location.toString());
 
@@ -640,12 +605,10 @@ public class TestFilteredScan {
             .option(SparkReadOptions.VECTORIZATION_ENABLED, 
String.valueOf(vectorized))
             .load(unpartitioned.toString());
 
+    // disable fanout writers to locally order records for future verifications
     allRows
-        .coalesce(1) // ensure only 1 file per partition is written
-        .withColumn("part", callUDF(udf, column(partitionColumn)))
-        .sortWithinPartitions("part")
-        .drop("part")
         .write()
+        .option(SparkWriteOptions.FANOUT_ENABLED, "false")
         .format("iceberg")
         .mode("append")
         .save(table.location());
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index ad0984ef42..11153b3943 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -296,11 +296,13 @@ public class TestPartitionValues {
       Table table = tables.create(SUPPORTED_PRIMITIVES, spec, 
location.toString());
       table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, 
format).commit();
 
+      // disable distribution/ordering and fanout writers to preserve the 
original ordering
       sourceDF
           .write()
           .format("iceberg")
           .mode(SaveMode.Append)
           .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
+          .option(SparkWriteOptions.FANOUT_ENABLED, "false")
           .save(location.toString());
 
       List<Row> actual =
@@ -374,11 +376,13 @@ public class TestPartitionValues {
       Table table = tables.create(nestedSchema, spec, location.toString());
       table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT, 
format).commit();
 
+      // disable distribution/ordering and fanout writers to preserve the 
original ordering
       sourceDF
           .write()
           .format("iceberg")
           .mode(SaveMode.Append)
           .option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, 
"false")
+          .option(SparkWriteOptions.FANOUT_ENABLED, "false")
           .save(location.toString());
 
       List<Row> actual =
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 6c96a33a25..4b57dcd3c8 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -167,6 +167,7 @@ public class TestRequiredDistributionAndOrdering extends 
SparkCatalogTestBase {
                 inputDF
                     .writeTo(tableName)
                     
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+                    .option(SparkWriteOptions.FANOUT_ENABLED, "false")
                     .append())
         .cause()
         .isInstanceOf(IllegalStateException.class)

Reply via email to