This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new b0bf62a448 Spark 3.5: Use rolling manifest writers when optimizing 
metadata (#8972)
b0bf62a448 is described below

commit b0bf62a448617bd5f57ca72c2648452e6600fa20
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 3 19:19:33 2023 -0700

    Spark 3.5: Use rolling manifest writers when optimizing metadata (#8972)
---
 .../spark/actions/RewriteManifestsSparkAction.java | 231 +++++++++------------
 .../spark/actions/TestRewriteManifestsAction.java  |  61 ++++--
 2 files changed, 134 insertions(+), 158 deletions(-)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index bc2ef23067..af442ec300 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -20,8 +20,7 @@ package org.apache.iceberg.spark.actions;
 
 import static org.apache.iceberg.MetadataTableType.ENTRIES;
 
-import java.io.IOException;
-import java.util.Collections;
+import java.io.Serializable;
 import java.util.List;
 import java.util.UUID;
 import java.util.function.Function;
@@ -36,6 +35,7 @@ import org.apache.iceberg.ManifestFiles;
 import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.RollingManifestWriter;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.TableOperations;
@@ -167,20 +167,7 @@ public class RewriteManifestsSparkAction
           .build();
     }
 
-    long totalSizeBytes = 0L;
-    int numEntries = 0;
-
-    for (ManifestFile manifest : matchingManifests) {
-      ValidationException.check(
-          hasFileCounts(manifest), "No file counts in manifest: %s", 
manifest.path());
-
-      totalSizeBytes += manifest.length();
-      numEntries +=
-          manifest.addedFilesCount() + manifest.existingFilesCount() + 
manifest.deletedFilesCount();
-    }
-
-    int targetNumManifests = targetNumManifests(totalSizeBytes);
-    int targetNumManifestEntries = targetNumManifestEntries(numEntries, 
targetNumManifests);
+    int targetNumManifests = 
targetNumManifests(totalSizeBytes(matchingManifests));
 
     if (targetNumManifests == 1 && matchingManifests.size() == 1) {
       return ImmutableRewriteManifests.Result.builder()
@@ -195,9 +182,7 @@ public class RewriteManifestsSparkAction
     if (spec.fields().size() < 1) {
       newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF, 
targetNumManifests);
     } else {
-      newManifests =
-          writeManifestsForPartitionedTable(
-              manifestEntryDF, targetNumManifests, targetNumManifestEntries);
+      newManifests = writeManifestsForPartitionedTable(manifestEntryDF, 
targetNumManifests);
     }
 
     replaceManifests(matchingManifests, newManifests);
@@ -233,41 +218,24 @@ public class RewriteManifestsSparkAction
   private List<ManifestFile> writeManifestsForUnpartitionedTable(
       Dataset<Row> manifestEntryDF, int numManifests) {
 
-    Broadcast<Table> tableBroadcast =
-        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-
-    // we rely only on the target number of manifests for unpartitioned tables
-    // as we should not worry about having too much metadata per partition
-    long maxNumManifestEntries = Long.MAX_VALUE;
+    Types.StructType partitionType = spec.partitionType();
 
     return manifestEntryDF
         .repartition(numManifests)
         .mapPartitions(
-            toManifests(
-                tableBroadcast,
-                maxNumManifestEntries,
-                outputLocation,
-                formatVersion,
-                combinedPartitionType,
-                spec,
-                sparkType),
+            toManifests(manifestWriters(), combinedPartitionType, 
partitionType, sparkType),
             manifestEncoder)
         .collectAsList();
   }
 
   private List<ManifestFile> writeManifestsForPartitionedTable(
-      Dataset<Row> manifestEntryDF, int numManifests, int 
targetNumManifestEntries) {
+      Dataset<Row> manifestEntryDF, int numManifests) {
 
-    Broadcast<Table> tableBroadcast =
-        sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
     StructType sparkType = (StructType) 
manifestEntryDF.schema().apply("data_file").dataType();
     Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-
-    // we allow the actual size of manifests to be 10% higher if the 
estimation is not precise
-    // enough
-    long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
+    Types.StructType partitionType = spec.partitionType();
 
     return withReusableDS(
         manifestEntryDF,
@@ -276,14 +244,7 @@ public class RewriteManifestsSparkAction
           return df.repartitionByRange(numManifests, partitionColumn)
               .sortWithinPartitions(partitionColumn)
               .mapPartitions(
-                  toManifests(
-                      tableBroadcast,
-                      maxNumManifestEntries,
-                      outputLocation,
-                      formatVersion,
-                      combinedPartitionType,
-                      spec,
-                      sparkType),
+                  toManifests(manifestWriters(), combinedPartitionType, 
partitionType, sparkType),
                   manifestEncoder)
               .collectAsList();
         });
@@ -319,8 +280,16 @@ public class RewriteManifestsSparkAction
     return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) / 
targetManifestSizeBytes);
   }
 
-  private int targetNumManifestEntries(int numEntries, int numManifests) {
-    return (numEntries + numManifests - 1) / numManifests;
+  private long totalSizeBytes(Iterable<ManifestFile> manifests) {
+    long totalSizeBytes = 0L;
+
+    for (ManifestFile manifest : manifests) {
+      ValidationException.check(
+          hasFileCounts(manifest), "No file counts in manifest: %s", 
manifest.path());
+      totalSizeBytes += manifest.length();
+    }
+
+    return totalSizeBytes;
   }
 
   private boolean hasFileCounts(ManifestFile manifest) {
@@ -360,104 +329,90 @@ public class RewriteManifestsSparkAction
         .run(location -> table.io().deleteFile(location));
   }
 
-  private static ManifestFile writeManifest(
-      List<Row> rows,
-      int startIndex,
-      int endIndex,
-      Broadcast<Table> tableBroadcast,
-      String location,
-      int format,
-      Types.StructType combinedPartitionType,
-      PartitionSpec spec,
-      StructType sparkType)
-      throws IOException {
+  private ManifestWriterFactory manifestWriters() {
+    return new ManifestWriterFactory(
+        sparkContext().broadcast(SerializableTableWithSize.copyOf(table)),
+        formatVersion,
+        spec.specId(),
+        outputLocation,
+        // allow the actual size of manifests to be 20% higher as the 
estimation is not precise
+        (long) (1.2 * targetManifestSizeBytes));
+  }
 
-    String manifestName = "optimized-m-" + UUID.randomUUID();
-    Path manifestPath = new Path(location, manifestName);
-    OutputFile outputFile =
-        tableBroadcast
-            .value()
-            .io()
-            
.newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+  private static MapPartitionsFunction<Row, ManifestFile> toManifests(
+      ManifestWriterFactory writers,
+      Types.StructType combinedPartitionType,
+      Types.StructType partitionType,
+      StructType sparkType) {
 
-    Types.StructType combinedFileType = 
DataFile.getType(combinedPartitionType);
-    Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
-    SparkDataFile wrapper = new SparkDataFile(combinedFileType, 
manifestFileType, sparkType);
+    return rows -> {
+      Types.StructType combinedFileType = 
DataFile.getType(combinedPartitionType);
+      Types.StructType manifestFileType = DataFile.getType(partitionType);
+      SparkDataFile wrapper = new SparkDataFile(combinedFileType, 
manifestFileType, sparkType);
+
+      RollingManifestWriter<DataFile> writer = 
writers.newRollingManifestWriter();
+
+      try {
+        while (rows.hasNext()) {
+          Row row = rows.next();
+          long snapshotId = row.getLong(0);
+          long sequenceNumber = row.getLong(1);
+          Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
+          Row file = row.getStruct(3);
+          writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, 
fileSequenceNumber);
+        }
+      } finally {
+        writer.close();
+      }
 
-    ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec, 
outputFile, null);
+      return writer.toManifestFiles().iterator();
+    };
+  }
 
-    try {
-      for (int index = startIndex; index < endIndex; index++) {
-        Row row = rows.get(index);
-        long snapshotId = row.getLong(0);
-        long sequenceNumber = row.getLong(1);
-        Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
-        Row file = row.getStruct(3);
-        writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber, 
fileSequenceNumber);
-      }
-    } finally {
-      writer.close();
+  private static class ManifestWriterFactory implements Serializable {
+    private final Broadcast<Table> tableBroadcast;
+    private final int formatVersion;
+    private final int specId;
+    private final String outputLocation;
+    private final long maxManifestSizeBytes;
+
+    ManifestWriterFactory(
+        Broadcast<Table> tableBroadcast,
+        int formatVersion,
+        int specId,
+        String outputLocation,
+        long maxManifestSizeBytes) {
+      this.tableBroadcast = tableBroadcast;
+      this.formatVersion = formatVersion;
+      this.specId = specId;
+      this.outputLocation = outputLocation;
+      this.maxManifestSizeBytes = maxManifestSizeBytes;
     }
 
-    return writer.toManifestFile();
-  }
+    public RollingManifestWriter<DataFile> newRollingManifestWriter() {
+      return new RollingManifestWriter<>(this::newManifestWriter, 
maxManifestSizeBytes);
+    }
 
-  private static MapPartitionsFunction<Row, ManifestFile> toManifests(
-      Broadcast<Table> tableBroadcast,
-      long maxNumManifestEntries,
-      String location,
-      int format,
-      Types.StructType combinedPartitionType,
-      PartitionSpec spec,
-      StructType sparkType) {
+    private ManifestWriter<DataFile> newManifestWriter() {
+      return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null);
+    }
 
-    return rows -> {
-      List<Row> rowsAsList = Lists.newArrayList(rows);
+    private PartitionSpec spec() {
+      return table().specs().get(specId);
+    }
 
-      if (rowsAsList.isEmpty()) {
-        return Collections.emptyIterator();
-      }
+    private OutputFile newOutputFile() {
+      return table().io().newOutputFile(newManifestLocation());
+    }
 
-      List<ManifestFile> manifests = Lists.newArrayList();
-      if (rowsAsList.size() <= maxNumManifestEntries) {
-        manifests.add(
-            writeManifest(
-                rowsAsList,
-                0,
-                rowsAsList.size(),
-                tableBroadcast,
-                location,
-                format,
-                combinedPartitionType,
-                spec,
-                sparkType));
-      } else {
-        int midIndex = rowsAsList.size() / 2;
-        manifests.add(
-            writeManifest(
-                rowsAsList,
-                0,
-                midIndex,
-                tableBroadcast,
-                location,
-                format,
-                combinedPartitionType,
-                spec,
-                sparkType));
-        manifests.add(
-            writeManifest(
-                rowsAsList,
-                midIndex,
-                rowsAsList.size(),
-                tableBroadcast,
-                location,
-                format,
-                combinedPartitionType,
-                spec,
-                sparkType));
-      }
+    private String newManifestLocation() {
+      String fileName = FileFormat.AVRO.addExtension("optimized-m-" + 
UUID.randomUUID());
+      Path filePath = new Path(outputLocation, fileName);
+      return filePath.toString();
+    }
 
-      return manifests.iterator();
-    };
+    private Table table() {
+      return tableBroadcast.value();
+    }
   }
 }
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4ce5ba4e9d..9ae1954d68 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -34,9 +34,13 @@ import java.io.File;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
 import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
@@ -45,6 +49,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.actions.RewriteManifests;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -412,16 +417,12 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
     options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED, 
snapshotIdInheritanceEnabled);
     Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
 
-    // all records belong to the same partition
-    List<ThreeColumnRecord> records = Lists.newArrayList();
-    for (int i = 0; i < 50; i++) {
-      records.add(new ThreeColumnRecord(i, String.valueOf(i), "0"));
+    List<DataFile> dataFiles = Lists.newArrayList();
+    for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+      dataFiles.add(newDataFile(table, "c3=" + fileOrdinal));
     }
-    Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
-    // repartition to create separate files
-    writeDF(df.repartition(50, df.col("c1")));
-
-    table.refresh();
+    ManifestFile appendManifest = writeManifest(table, dataFiles);
+    table.newFastAppend().appendManifest(appendManifest).commit();
 
     List<ManifestFile> manifests = 
table.currentSnapshot().allManifests(table.io());
     Assert.assertEquals("Should have 1 manifests before rewrite", 1, 
manifests.size());
@@ -446,22 +447,14 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
             .stagingLocation(stagingLocation)
             .execute();
 
-    Assert.assertEquals(
-        "Action should rewrite 1 manifest", 1, 
Iterables.size(result.rewrittenManifests()));
-    Assert.assertEquals(
-        "Action should add 2 manifests", 2, 
Iterables.size(result.addedManifests()));
+    Assertions.assertThat(result.rewrittenManifests()).hasSize(1);
+    
Assertions.assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2);
     assertManifestsLocation(result.addedManifests(), stagingLocation);
 
     table.refresh();
 
     List<ManifestFile> newManifests = 
table.currentSnapshot().allManifests(table.io());
-    Assert.assertEquals("Should have 2 manifests after rewrite", 2, 
newManifests.size());
-
-    Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
-    List<ThreeColumnRecord> actualRecords =
-        resultDF.sort("c1", 
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-
-    Assert.assertEquals("Rows must match", records, actualRecords);
+    Assertions.assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2);
   }
 
   @Test
@@ -643,4 +636,32 @@ public class TestRewriteManifestsAction extends 
SparkTestBase {
       assertThat(manifests).allMatch(manifest -> 
manifest.path().startsWith(tableLocation));
     }
   }
+
+  private ManifestFile writeManifest(Table table, List<DataFile> files) throws 
IOException {
+    File manifestFile = temp.newFile("generated-manifest.avro");
+    Assert.assertTrue(manifestFile.delete());
+    OutputFile outputFile = 
table.io().newOutputFile(manifestFile.getCanonicalPath());
+
+    ManifestWriter<DataFile> writer =
+        ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
+
+    try {
+      for (DataFile file : files) {
+        writer.add(file);
+      }
+    } finally {
+      writer.close();
+    }
+
+    return writer.toManifestFile();
+  }
+
+  private DataFile newDataFile(Table table, String partitionPath) {
+    return DataFiles.builder(table.spec())
+        .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
+        .withFileSizeInBytes(10)
+        .withPartitionPath(partitionPath)
+        .withRecordCount(1)
+        .build();
+  }
 }

Reply via email to