This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a0191e8de Spark 3.4: Fix rewriting manifests for evolved 
unpartitioned V1 tables (#9599)
9a0191e8de is described below

commit 9a0191e8de151864f27de00b30d6d1bae634a534
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Thu Feb 1 09:36:42 2024 -0800

    Spark 3.4: Fix rewriting manifests for evolved unpartitioned V1 tables 
(#9599)
    
    This change backports #9015 to Spark 3.4.
---
 .../spark/actions/RewriteManifestsSparkAction.java |  2 +-
 .../spark/actions/TestRewriteManifestsAction.java  | 63 ++++++++++++++++++++--
 2 files changed, 61 insertions(+), 4 deletions(-)

diff --git 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index af442ec300..0c08324a1a 100644
--- 
a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -179,7 +179,7 @@ public class RewriteManifestsSparkAction
     Dataset<Row> manifestEntryDF = buildManifestEntryDF(matchingManifests);
 
     List<ManifestFile> newManifests;
-    if (spec.fields().size() < 1) {
+    if (spec.isUnpartitioned()) {
       newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, 
targetNumManifests);
     } else {
       newManifests = writeManifestsForPartitionedTable(manifestEntryDF, 
targetNumManifests);
diff --git 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 9ae1954d68..d3932c2e82 100644
--- 
a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -44,8 +44,10 @@ import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.TestHelpers;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
@@ -598,6 +600,55 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     Assert.assertEquals("Rows must match", expectedRecords, actualRecords);
   }
 
+  @Test
+  public void testRewriteLargeManifestsEvolvedUnpartitionedV1Table() throws 
IOException {
+    assumeThat(formatVersion).isEqualTo(1);
+
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c3").build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    table.updateSpec().removeField("c3").commit();
+
+    assertThat(table.spec().fields()).hasSize(1).allMatch(field -> 
field.transform().isVoid());
+
+    List<DataFile> dataFiles = Lists.newArrayList();
+    for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+      dataFiles.add(newDataFile(table, TestHelpers.Row.of(new Object[] 
{null})));
+    }
+    ManifestFile appendManifest = writeManifest(table, dataFiles);
+    table.newFastAppend().appendManifest(appendManifest).commit();
+
+    List<ManifestFile> originalManifests = 
table.currentSnapshot().allManifests(table.io());
+    ManifestFile originalManifest = 
Iterables.getOnlyElement(originalManifests);
+
+    // set the target manifest size to a small value to force splitting 
records into multiple files
+    table
+        .updateProperties()
+        .set(
+            TableProperties.MANIFEST_TARGET_SIZE_BYTES,
+            String.valueOf(originalManifest.length() / 2))
+        .commit();
+
+    SparkActions actions = SparkActions.get();
+
+    RewriteManifests.Result result =
+        actions
+            .rewriteManifests(table)
+            .rewriteIf(manifest -> true)
+            .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+            .execute();
+
+    assertThat(result.rewrittenManifests()).hasSize(1);
+    assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2);
+    assertManifestsLocation(result.addedManifests());
+
+    List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
+    assertThat(manifests).hasSizeGreaterThanOrEqualTo(2);
+  }
+
   private void writeRecords(List<ThreeColumnRecord> records) {
     Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
     writeDF(df);
@@ -657,11 +708,17 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
   }
 
   private DataFile newDataFile(Table table, String partitionPath) {
+    return newDataFileBuilder(table).withPartitionPath(partitionPath).build();
+  }
+
+  private DataFile newDataFile(Table table, StructLike partition) {
+    return newDataFileBuilder(table).withPartition(partition).build();
+  }
+
+  private DataFiles.Builder newDataFileBuilder(Table table) {
     return DataFiles.builder(table.spec())
         .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
         .withFileSizeInBytes(10)
-        .withPartitionPath(partitionPath)
-        .withRecordCount(1)
-        .build();
+        .withRecordCount(1);
   }
 }

Reply via email to