This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 9a0191e8de Spark 3.4: Fix rewriting manifests for evolved
unpartitioned V1 tables (#9599)
9a0191e8de is described below
commit 9a0191e8de151864f27de00b30d6d1bae634a534
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 1 09:36:42 2024 -0800
Spark 3.4: Fix rewriting manifests for evolved unpartitioned V1 tables
(#9599)
This change backports #9015 to Spark 3.4.
---
.../spark/actions/RewriteManifestsSparkAction.java | 2 +-
.../spark/actions/TestRewriteManifestsAction.java | 63 ++++++++++++++++++++--
2 files changed, 61 insertions(+), 4 deletions(-)
diff --git
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index af442ec300..0c08324a1a 100644
---
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -179,7 +179,7 @@ public class RewriteManifestsSparkAction
Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
List<ManifestFile> newManifests;
- if (spec.fields().size() < 1) {
+ if (spec.isUnpartitioned()) {
newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF,
targetNumManifests);
} else {
newManifests = writeManifestsForPartitionedTable(manifestEntryDF,
targetNumManifests);
diff --git
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 9ae1954d68..d3932c2e82 100644
---
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -44,8 +44,10 @@ import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
@@ -598,6 +600,55 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
}
+ @Test
+ public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws
IOException {
+ assumeThat(formatVersion).isEqualTo(1);
+
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
snapshotIdInheritanceEnabled);
+ options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ table.updateSpec().removeField("c3").commit();
+
+ assertThat(table.spec().fields()).hasSize(1).allMatch(field ->
field.transform().isVoid());
+
+ List<DataFile> dataFiles = Lists.newArrayList();
+ for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+ dataFiles.add(newDataFile(table, TestHelpers.Row.of(new Object[]
{null})));
+ }
+ ManifestFile appendManifest = writeManifest(table, dataFiles);
+ table.newFastAppend().appendManifest(appendManifest).commit();
+
+ List<ManifestFile> originalManifests =
table.currentSnapshot().allManifests(table.io());
+ ManifestFile originalManifest =
Iterables.getOnlyElement(originalManifests);
+
+ // set the target manifest size to a small value to force splitting
records into multiple files
+ table
+ .updateProperties()
+ .set(
+ TableProperties.MANIFEST_TARGET_SIZE_BYTES,
+ String.valueOf(originalManifest.length() / 2))
+ .commit();
+
+ SparkActions actions = SparkActions.get();
+
+ RewriteManifests.Result result =
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
+
+ assertThat(result.rewrittenManifests()).hasSize(1);
+ assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2);
+ assertManifestsLocation(result.addedManifests());
+
+ List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
+ assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
+ }
+
private void writeRecords(List<ThreeColumnRecord> records) {
Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
writeDF(df);
@@ -657,11 +708,17 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
}
private DataFile newDataFile(Table table, String partitionPath) {
+ return newDataFileBuilder(table).withPartitionPath(partitionPath).build();
+ }
+
+ private DataFile newDataFile(Table table, StructLike partition) {
+ return newDataFileBuilder(table).withPartition(partition).build();
+ }
+
+ private DataFiles.Builder newDataFileBuilder(Table table) {
return DataFiles.builder(table.spec())
.withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
.withFileSizeInBytes(10)
- .withPartitionPath(partitionPath)
- .withRecordCount(1)
- .build();
+ .withRecordCount(1);
}
}