This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new b7296a755b Spark 3.5: Use fanout writers for unsorted tables by
default (#8621)
b7296a755b is described below
commit b7296a755ba9c3e9236a43f99225c580cd9abdc0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Sep 26 16:04:30 2023 -0700
Spark 3.5: Use fanout writers for unsorted tables by default (#8621)
---
.../TestRequiredDistributionAndOrdering.java | 1 +
.../iceberg/spark/extensions/TestWriteAborts.java | 1 +
.../org/apache/iceberg/spark/SparkWriteConf.java | 13 +-
.../spark/source/SparkPositionDeltaWrite.java | 13 +-
.../apache/iceberg/spark/source/SparkWrite.java | 14 +-
.../TestSparkDistributionAndOrderingUtil.java | 409 +++++----------------
.../iceberg/spark/source/TestFilteredScan.java | 59 +--
.../iceberg/spark/source/TestPartitionValues.java | 4 +
.../TestRequiredDistributionAndOrdering.java | 1 +
9 files changed, 143 insertions(+), 372 deletions(-)
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
index fcdf9bf992..809de08379 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRequiredDistributionAndOrdering.java
@@ -195,6 +195,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkExtensionsTestBase
inputDF
.writeTo(tableName)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.append();
} catch (NoSuchTableException e) {
throw new RuntimeException(e);
diff --git
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
index 15484f45f8..4d87099572 100644
---
a/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
+++
b/spark/v3.5/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestWriteAborts.java
@@ -112,6 +112,7 @@ public class TestWriteAborts extends
SparkExtensionsTestBase {
.sortWithinPartitions("id")
.writeTo(tableName)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.append())
.isInstanceOf(SparkException.class)
.hasMessageContaining("Encountered records that belong to already
closed files");
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
index 28a586e3e9..834e839874 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkWriteConf.java
@@ -172,12 +172,21 @@ public class SparkWriteConf {
.parse();
}
- public boolean fanoutWriterEnabled() {
+ public boolean useFanoutWriter(SparkWriteRequirements writeRequirements) {
+ boolean defaultValue = !writeRequirements.hasOrdering();
+ return fanoutWriterEnabled(defaultValue);
+ }
+
+ private boolean fanoutWriterEnabled() {
+ return fanoutWriterEnabled(true /* enabled by default */);
+ }
+
+ private boolean fanoutWriterEnabled(boolean defaultValue) {
return confParser
.booleanConf()
.option(SparkWriteOptions.FANOUT_ENABLED)
.tableProperty(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED)
-
.defaultValue(TableProperties.SPARK_WRITE_PARTITIONED_FANOUT_ENABLED_DEFAULT)
+ .defaultValue(defaultValue)
.parse();
}
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
index 9fea33948b..a8145c2aba 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeltaWrite.java
@@ -406,11 +406,10 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
Table table, SparkFileWriterFactory writers, OutputFileFactory files,
Context context) {
FileIO io = table.io();
- boolean fanoutEnabled = context.fanoutWriterEnabled();
- boolean inputOrdered = context.inputOrdered();
+ boolean useFanoutWriter = context.useFanoutWriter();
long targetFileSize = context.targetDataFileSize();
- if (table.spec().isPartitioned() && fanoutEnabled && !inputOrdered) {
+ if (table.spec().isPartitioned() && useFanoutWriter) {
return new FanoutDataWriter<>(writers, files, io, targetFileSize);
} else {
return new ClusteredDataWriter<>(writers, files, io, targetFileSize);
@@ -670,7 +669,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
private final FileFormat deleteFileFormat;
private final long targetDeleteFileSize;
private final String queryId;
- private final boolean fanoutWriterEnabled;
+ private final boolean useFanoutWriter;
private final boolean inputOrdered;
Context(
@@ -687,7 +686,7 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
this.targetDeleteFileSize = writeConf.targetDeleteFileSize();
this.metadataSparkType = info.metadataSchema().get();
this.queryId = info.queryId();
- this.fanoutWriterEnabled = writeConf.fanoutWriterEnabled();
+ this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
this.inputOrdered = writeRequirements.hasOrdering();
}
@@ -723,8 +722,8 @@ class SparkPositionDeltaWrite implements DeltaWrite,
RequiresDistributionAndOrde
return queryId;
}
- boolean fanoutWriterEnabled() {
- return fanoutWriterEnabled;
+ boolean useFanoutWriter() {
+ return useFanoutWriter;
}
boolean inputOrdered() {
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
index 15881098e7..802c789ce8 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
@@ -98,7 +98,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final Schema writeSchema;
private final StructType dsSchema;
private final Map<String, String> extraSnapshotMetadata;
- private final boolean partitionedFanoutEnabled;
+ private final boolean useFanoutWriter;
private final SparkWriteRequirements writeRequirements;
private final Map<String, String> writeProperties;
@@ -126,7 +126,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
this.extraSnapshotMetadata = writeConf.extraSnapshotMetadata();
- this.partitionedFanoutEnabled = writeConf.fanoutWriterEnabled();
+ this.useFanoutWriter = writeConf.useFanoutWriter(writeRequirements);
this.writeRequirements = writeRequirements;
this.outputSpecId = writeConf.outputSpecId();
this.writeProperties = writeConf.writeProperties();
@@ -188,7 +188,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
targetFileSize,
writeSchema,
dsSchema,
- partitionedFanoutEnabled,
+ useFanoutWriter,
writeProperties);
}
@@ -617,7 +617,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
private final long targetFileSize;
private final Schema writeSchema;
private final StructType dsSchema;
- private final boolean partitionedFanoutEnabled;
+ private final boolean useFanoutWriter;
private final String queryId;
private final Map<String, String> writeProperties;
@@ -629,7 +629,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
long targetFileSize,
Schema writeSchema,
StructType dsSchema,
- boolean partitionedFanoutEnabled,
+ boolean useFanoutWriter,
Map<String, String> writeProperties) {
this.tableBroadcast = tableBroadcast;
this.format = format;
@@ -637,7 +637,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
this.targetFileSize = targetFileSize;
this.writeSchema = writeSchema;
this.dsSchema = dsSchema;
- this.partitionedFanoutEnabled = partitionedFanoutEnabled;
+ this.useFanoutWriter = useFanoutWriter;
this.queryId = queryId;
this.writeProperties = writeProperties;
}
@@ -678,7 +678,7 @@ abstract class SparkWrite implements Write,
RequiresDistributionAndOrdering {
writeSchema,
dsSchema,
targetFileSize,
- partitionedFanoutEnabled);
+ useFanoutWriter);
}
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
index 79374edc3f..7ed34d4016 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestSparkDistributionAndOrderingUtil.java
@@ -241,6 +241,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -252,23 +254,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
};
checkWriteDistributionAndOrdering(table, expectedDistribution,
expectedOrdering);
- }
-
- @Test
- public void testDefaultWritePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
- Table table = validationCatalog.loadTable(tableIdent);
-
- table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
-
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkWriteDistributionAndOrdering(table, expectedDistribution,
EMPTY_ORDERING);
}
@@ -285,6 +272,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(WRITE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -296,27 +285,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
};
checkWriteDistributionAndOrdering(table, expectedDistribution,
expectedOrdering);
- }
- @Test
- public void testHashWritePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
-
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkWriteDistributionAndOrdering(table, expectedDistribution,
EMPTY_ORDERING);
}
@@ -333,6 +303,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(WRITE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -342,31 +314,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
checkWriteDistributionAndOrdering(table, expectedDistribution,
expectedOrdering);
- }
-
- @Test
- public void testRangeWritePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(WRITE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
-
- SortOrder[] expectedOrdering =
- new SortOrder[] {
- Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
- Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
- };
-
- Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
+ enableFanoutWriters(table);
checkWriteDistributionAndOrdering(table, expectedDistribution,
EMPTY_ORDERING);
}
@@ -434,6 +383,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.replaceSortOrder().asc("id").commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -443,29 +394,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
checkWriteDistributionAndOrdering(table, expectedDistribution,
expectedOrdering);
- }
-
- @Test
- public void testRangeWritePartitionedSortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date)",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table.replaceSortOrder().asc("id").commit();
-
- table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
-
- SortOrder[] expectedOrdering =
- new SortOrder[] {
- Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
- Expressions.sort(Expressions.column("id"), SortDirection.ASCENDING)
- };
- Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
+ enableFanoutWriters(table);
checkWriteDistributionAndOrdering(table, expectedDistribution,
expectedOrdering);
}
@@ -642,6 +572,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -653,23 +585,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
};
checkCopyOnWriteDistributionAndOrdering(table, DELETE,
expectedDistribution, expectedOrdering);
- }
- @Test
- public void testDefaultCopyOnWriteDeletePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
-
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, DELETE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -686,6 +603,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -694,23 +613,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(
table, DELETE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
- }
-
- @Test
- public void testNoneCopyOnWriteDeletePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(
table, DELETE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
@@ -728,6 +632,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -739,27 +645,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
};
checkCopyOnWriteDistributionAndOrdering(table, DELETE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testHashCopyOnWriteDeletePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
- table
- .updateProperties()
- .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
-
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, DELETE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -776,6 +663,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -785,30 +674,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
checkCopyOnWriteDistributionAndOrdering(table, DELETE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testRangeCopyOnWriteDeletePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(DELETE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
-
- SortOrder[] expectedOrdering =
- new SortOrder[] {
- Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
- Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
- };
- Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, DELETE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1086,6 +953,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -1097,23 +966,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
};
checkCopyOnWriteDistributionAndOrdering(table, UPDATE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testDefaultCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, UPDATE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1130,6 +984,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -1138,23 +994,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(
table, UPDATE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
- }
- @Test
- public void testNoneCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(
table, UPDATE, UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
@@ -1172,6 +1013,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -1183,27 +1026,8 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
};
checkCopyOnWriteDistributionAndOrdering(table, UPDATE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testHashCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, UPDATE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1220,6 +1044,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -1229,30 +1055,8 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
checkCopyOnWriteDistributionAndOrdering(table, UPDATE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testRangeCopyOnWriteUpdatePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(UPDATE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
-
- SortOrder[] expectedOrdering =
- new SortOrder[] {
- Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
- Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
- };
- Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, UPDATE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1526,6 +1330,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -1537,23 +1343,8 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
};
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testDefaultCopyOnWriteMergePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
- Table table = validationCatalog.loadTable(tableIdent);
-
- table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
-
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1570,6 +1361,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -1578,23 +1371,8 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
checkCopyOnWriteDistributionAndOrdering(
table, MERGE, UNSPECIFIED_DISTRIBUTION, expectedOrdering);
- }
-
- @Test
- public void testNoneCopyOnWriteMergePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
- table
- .updateProperties()
- .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_NONE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
UNSPECIFIED_DISTRIBUTION, EMPTY_ORDERING);
}
@@ -1611,6 +1389,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
@@ -1622,27 +1402,8 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
};
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
expectedDistribution, expectedOrdering);
- }
- @Test
- public void testHashCopyOnWriteMergePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_HASH)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
-
- Expression[] expectedClustering =
- new Expression[] {Expressions.identity("date"),
Expressions.days("ts")};
- Distribution expectedDistribution =
Distributions.clustered(expectedClustering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1659,6 +1420,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
@@ -1668,30 +1431,8 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
expectedDistribution, expectedOrdering);
- }
-
- @Test
- public void testRangeCopyOnWriteMergePartitionedUnsortedTableFanout() {
- sql(
- "CREATE TABLE %s (id BIGINT, data STRING, date DATE, ts TIMESTAMP) "
- + "USING iceberg "
- + "PARTITIONED BY (date, days(ts))",
- tableName);
-
- Table table = validationCatalog.loadTable(tableIdent);
-
- table
- .updateProperties()
- .set(MERGE_DISTRIBUTION_MODE, WRITE_DISTRIBUTION_MODE_RANGE)
- .set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED, "true")
- .commit();
- SortOrder[] expectedOrdering =
- new SortOrder[] {
- Expressions.sort(Expressions.column("date"),
SortDirection.ASCENDING),
- Expressions.sort(Expressions.days("ts"), SortDirection.ASCENDING)
- };
- Distribution expectedDistribution =
Distributions.ordered(expectedOrdering);
+ enableFanoutWriters(table);
checkCopyOnWriteDistributionAndOrdering(table, MERGE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -1838,6 +1579,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table,
DELETE,
@@ -1858,6 +1601,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table, DELETE, UNSPECIFIED_DISTRIBUTION,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
@@ -1875,6 +1620,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table,
DELETE,
@@ -1895,6 +1642,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
Distribution expectedDistribution =
Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
checkPositionDeltaDistributionAndOrdering(
@@ -1915,6 +1664,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table,
DELETE,
@@ -1939,6 +1690,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table, DELETE, UNSPECIFIED_DISTRIBUTION,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
@@ -1960,6 +1713,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table,
DELETE,
@@ -1984,6 +1739,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(DELETE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
Distribution expectedDistribution =
Distributions.ordered(SPEC_ID_PARTITION_ORDERING);
checkPositionDeltaDistributionAndOrdering(
@@ -2063,6 +1820,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table,
UPDATE,
@@ -2083,6 +1842,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table, UPDATE, UNSPECIFIED_DISTRIBUTION,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
@@ -2100,6 +1861,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table,
UPDATE,
@@ -2120,12 +1883,14 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
Distribution expectedDistribution =
Distributions.ordered(SPEC_ID_PARTITION_FILE_ORDERING);
checkPositionDeltaDistributionAndOrdering(
table, UPDATE, expectedDistribution,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
- table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
+ enableFanoutWriters(table);
checkPositionDeltaDistributionAndOrdering(table, UPDATE,
expectedDistribution, EMPTY_ORDERING);
}
@@ -2263,6 +2028,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {
Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2306,6 +2073,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(
@@ -2341,6 +2110,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {
Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2384,6 +2155,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(UPDATE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedDistributionOrdering =
new SortOrder[] {
Expressions.sort(
@@ -2647,6 +2420,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {
Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2671,6 +2446,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
checkPositionDeltaDistributionAndOrdering(
table, MERGE, UNSPECIFIED_DISTRIBUTION,
SPEC_ID_PARTITION_FILE_POSITION_ORDERING);
@@ -2688,6 +2465,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {
Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2712,6 +2491,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedDistributionOrdering =
new SortOrder[] {
Expressions.sort(
@@ -2877,6 +2658,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
Table table = validationCatalog.loadTable(tableIdent);
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {
Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2919,6 +2702,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_NONE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedOrdering =
new SortOrder[] {
Expressions.sort(
@@ -2954,6 +2739,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_HASH).commit();
+ disableFanoutWriters(table);
+
Expression[] expectedClustering =
new Expression[] {
Expressions.column(MetadataColumns.SPEC_ID.name()),
@@ -2996,6 +2783,8 @@ public class TestSparkDistributionAndOrderingUtil extends
SparkTestBaseWithCatal
table.updateProperties().set(MERGE_DISTRIBUTION_MODE,
WRITE_DISTRIBUTION_MODE_RANGE).commit();
+ disableFanoutWriters(table);
+
SortOrder[] expectedDistributionOrdering =
new SortOrder[] {
Expressions.sort(
@@ -3227,6 +3016,10 @@ public class TestSparkDistributionAndOrderingUtil
extends SparkTestBaseWithCatal
Assert.assertArrayEquals("Ordering must match", expectedOrdering,
ordering);
}
+ private void disableFanoutWriters(Table table) {
+ table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"false").commit();
+ }
+
private void enableFanoutWriters(Table table) {
table.updateProperties().set(SPARK_WRITE_PARTITIONED_FANOUT_ENABLED,
"true").commit();
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
index e8af5e51ec..0efec160e8 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestFilteredScan.java
@@ -21,18 +21,13 @@ package org.apache.iceberg.spark.source;
import static org.apache.iceberg.Files.localOutput;
import static org.apache.iceberg.PlanningMode.DISTRIBUTED;
import static org.apache.iceberg.PlanningMode.LOCAL;
-import static
org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp;
-import static org.apache.spark.sql.functions.callUDF;
-import static org.apache.spark.sql.functions.column;
import java.io.File;
import java.io.IOException;
-import java.sql.Timestamp;
import java.time.OffsetDateTime;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
-import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
@@ -52,14 +47,13 @@ import
org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.SparkReadOptions;
+import org.apache.iceberg.spark.SparkWriteOptions;
import org.apache.iceberg.spark.data.GenericsHelpers;
-import org.apache.iceberg.transforms.Transforms;
import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
-import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
import org.apache.spark.sql.connector.expressions.filter.Predicate;
import org.apache.spark.sql.connector.read.Batch;
@@ -73,9 +67,6 @@ import org.apache.spark.sql.sources.GreaterThan;
import org.apache.spark.sql.sources.LessThan;
import org.apache.spark.sql.sources.Not;
import org.apache.spark.sql.sources.StringStartsWith;
-import org.apache.spark.sql.types.IntegerType$;
-import org.apache.spark.sql.types.LongType$;
-import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.assertj.core.api.Assertions;
import org.junit.AfterClass;
@@ -119,29 +110,6 @@ public class TestFilteredScan {
@BeforeClass
public static void startSpark() {
TestFilteredScan.spark =
SparkSession.builder().master("local[2]").getOrCreate();
-
- // define UDFs used by partition tests
- Function<Object, Integer> bucket4 =
Transforms.bucket(4).bind(Types.LongType.get());
- spark.udf().register("bucket4", (UDF1<Long, Integer>) bucket4::apply,
IntegerType$.MODULE$);
-
- Function<Object, Integer> day =
Transforms.day().bind(Types.TimestampType.withZone());
- spark
- .udf()
- .register(
- "ts_day",
- (UDF1<Timestamp, Integer>) timestamp ->
day.apply(fromJavaTimestamp(timestamp)),
- IntegerType$.MODULE$);
-
- Function<Object, Integer> hour =
Transforms.hour().bind(Types.TimestampType.withZone());
- spark
- .udf()
- .register(
- "ts_hour",
- (UDF1<Timestamp, Integer>) timestamp ->
hour.apply(fromJavaTimestamp(timestamp)),
- IntegerType$.MODULE$);
-
- spark.udf().register("data_ident", (UDF1<String, String>) data -> data,
StringType$.MODULE$);
- spark.udf().register("id_ident", (UDF1<Long, Long>) id -> id,
LongType$.MODULE$);
}
@AfterClass
@@ -299,7 +267,7 @@ public class TestFilteredScan {
@Test
public void testBucketPartitionedIDFilters() {
- Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID,
"bucket4", "id");
+ Table table = buildPartitionedTable("bucketed_by_id", BUCKET_BY_ID);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
@@ -329,7 +297,7 @@ public class TestFilteredScan {
@SuppressWarnings("checkstyle:AvoidNestedBlocks")
@Test
public void testDayPartitionedTimestampFilters() {
- Table table = buildPartitionedTable("partitioned_by_day",
PARTITION_BY_DAY, "ts_day", "ts");
+ Table table = buildPartitionedTable("partitioned_by_day",
PARTITION_BY_DAY);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
Batch unfiltered =
@@ -383,7 +351,7 @@ public class TestFilteredScan {
@SuppressWarnings("checkstyle:AvoidNestedBlocks")
@Test
public void testHourPartitionedTimestampFilters() {
- Table table = buildPartitionedTable("partitioned_by_hour",
PARTITION_BY_HOUR, "ts_hour", "ts");
+ Table table = buildPartitionedTable("partitioned_by_hour",
PARTITION_BY_HOUR);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
@@ -479,8 +447,7 @@ public class TestFilteredScan {
@Test
public void testPartitionedByDataStartsWithFilter() {
- Table table =
- buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA,
"data_ident", "data");
+ Table table = buildPartitionedTable("partitioned_by_data",
PARTITION_BY_DATA);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
@@ -495,8 +462,7 @@ public class TestFilteredScan {
@Test
public void testPartitionedByDataNotStartsWithFilter() {
- Table table =
- buildPartitionedTable("partitioned_by_data", PARTITION_BY_DATA,
"data_ident", "data");
+ Table table = buildPartitionedTable("partitioned_by_data",
PARTITION_BY_DATA);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
@@ -511,7 +477,7 @@ public class TestFilteredScan {
@Test
public void testPartitionedByIdStartsWith() {
- Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID,
"id_ident", "id");
+ Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
@@ -527,7 +493,7 @@ public class TestFilteredScan {
@Test
public void testPartitionedByIdNotStartsWith() {
- Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID,
"id_ident", "id");
+ Table table = buildPartitionedTable("partitioned_by_id", PARTITION_BY_ID);
CaseInsensitiveStringMap options =
new CaseInsensitiveStringMap(ImmutableMap.of("path",
table.location()));
@@ -623,8 +589,7 @@ public class TestFilteredScan {
filterable.pushPredicates(Arrays.stream(filters).map(Filter::toV2).toArray(Predicate[]::new));
}
- private Table buildPartitionedTable(
- String desc, PartitionSpec spec, String udf, String partitionColumn) {
+ private Table buildPartitionedTable(String desc, PartitionSpec spec) {
File location = new File(parent, desc);
Table table = TABLES.create(SCHEMA, spec, location.toString());
@@ -640,12 +605,10 @@ public class TestFilteredScan {
.option(SparkReadOptions.VECTORIZATION_ENABLED,
String.valueOf(vectorized))
.load(unpartitioned.toString());
+ // disable fanout writers to locally order records for future verifications
allRows
- .coalesce(1) // ensure only 1 file per partition is written
- .withColumn("part", callUDF(udf, column(partitionColumn)))
- .sortWithinPartitions("part")
- .drop("part")
.write()
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.format("iceberg")
.mode("append")
.save(table.location());
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
index ad0984ef42..11153b3943 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestPartitionValues.java
@@ -296,11 +296,13 @@ public class TestPartitionValues {
Table table = tables.create(SUPPORTED_PRIMITIVES, spec,
location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT,
format).commit();
+ // disable distribution/ordering and fanout writers to preserve the
original ordering
sourceDF
.write()
.format("iceberg")
.mode(SaveMode.Append)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.save(location.toString());
List<Row> actual =
@@ -374,11 +376,13 @@ public class TestPartitionValues {
Table table = tables.create(nestedSchema, spec, location.toString());
table.updateProperties().set(TableProperties.DEFAULT_FILE_FORMAT,
format).commit();
+ // disable distribution/ordering and fanout writers to preserve the
original ordering
sourceDF
.write()
.format("iceberg")
.mode(SaveMode.Append)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING,
"false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.save(location.toString());
List<Row> actual =
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
index 6c96a33a25..4b57dcd3c8 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestRequiredDistributionAndOrdering.java
@@ -167,6 +167,7 @@ public class TestRequiredDistributionAndOrdering extends
SparkCatalogTestBase {
inputDF
.writeTo(tableName)
.option(SparkWriteOptions.USE_TABLE_DISTRIBUTION_AND_ORDERING, "false")
+ .option(SparkWriteOptions.FANOUT_ENABLED, "false")
.append())
.cause()
.isInstanceOf(IllegalStateException.class)