This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new b0bf62a448 Spark 3.5: Use rolling manifest writers when optimizing
metadata (#8972)
b0bf62a448 is described below
commit b0bf62a448617bd5f57ca72c2648452e6600fa20
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Nov 3 19:19:33 2023 -0700
Spark 3.5: Use rolling manifest writers when optimizing metadata (#8972)
---
.../spark/actions/RewriteManifestsSparkAction.java | 231 +++++++++------------
.../spark/actions/TestRewriteManifestsAction.java | 61 ++++--
2 files changed, 134 insertions(+), 158 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
index bc2ef23067..af442ec300 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteManifestsSparkAction.java
@@ -20,8 +20,7 @@ package org.apache.iceberg.spark.actions;
import static org.apache.iceberg.MetadataTableType.ENTRIES;
-import java.io.IOException;
-import java.util.Collections;
+import java.io.Serializable;
import java.util.List;
import java.util.UUID;
import java.util.function.Function;
@@ -36,6 +35,7 @@ import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
+import org.apache.iceberg.RollingManifestWriter;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableOperations;
@@ -167,20 +167,7 @@ public class RewriteManifestsSparkAction
.build();
}
- long totalSizeBytes = 0L;
- int numEntries = 0;
-
- for (ManifestFile manifest : matchingManifests) {
- ValidationException.check(
- hasFileCounts(manifest), "No file counts in manifest: %s",
manifest.path());
-
- totalSizeBytes += manifest.length();
- numEntries +=
- manifest.addedFilesCount() + manifest.existingFilesCount() +
manifest.deletedFilesCount();
- }
-
- int targetNumManifests = targetNumManifests(totalSizeBytes);
- int targetNumManifestEntries = targetNumManifestEntries(numEntries,
targetNumManifests);
+ int targetNumManifests =
targetNumManifests(totalSizeBytes(matchingManifests));
if (targetNumManifests == 1 && matchingManifests.size() == 1) {
return ImmutableRewriteManifests.Result.builder()
@@ -195,9 +182,7 @@ public class RewriteManifestsSparkAction
if (spec.fields().size() < 1) {
newManifests = writeManifestsForUnpartitionedTable(manifestEntryDF,
targetNumManifests);
} else {
- newManifests =
- writeManifestsForPartitionedTable(
- manifestEntryDF, targetNumManifests, targetNumManifestEntries);
+ newManifests = writeManifestsForPartitionedTable(manifestEntryDF,
targetNumManifests);
}
replaceManifests(matchingManifests, newManifests);
@@ -233,41 +218,24 @@ public class RewriteManifestsSparkAction
private List<ManifestFile> writeManifestsForUnpartitionedTable(
Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<Table> tableBroadcast =
- sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-
- // we rely only on the target number of manifests for unpartitioned tables
- // as we should not worry about having too much metadata per partition
- long maxNumManifestEntries = Long.MAX_VALUE;
+ Types.StructType partitionType = spec.partitionType();
return manifestEntryDF
.repartition(numManifests)
.mapPartitions(
- toManifests(
- tableBroadcast,
- maxNumManifestEntries,
- outputLocation,
- formatVersion,
- combinedPartitionType,
- spec,
- sparkType),
+ toManifests(manifestWriters(), combinedPartitionType,
partitionType, sparkType),
manifestEncoder)
.collectAsList();
}
private List<ManifestFile> writeManifestsForPartitionedTable(
- Dataset<Row> manifestEntryDF, int numManifests, int
targetNumManifestEntries) {
+ Dataset<Row> manifestEntryDF, int numManifests) {
- Broadcast<Table> tableBroadcast =
- sparkContext().broadcast(SerializableTableWithSize.copyOf(table));
StructType sparkType = (StructType)
manifestEntryDF.schema().apply("data_file").dataType();
Types.StructType combinedPartitionType = Partitioning.partitionType(table);
-
- // we allow the actual size of manifests to be 10% higher if the
estimation is not precise
- // enough
- long maxNumManifestEntries = (long) (1.1 * targetNumManifestEntries);
+ Types.StructType partitionType = spec.partitionType();
return withReusableDS(
manifestEntryDF,
@@ -276,14 +244,7 @@ public class RewriteManifestsSparkAction
return df.repartitionByRange(numManifests, partitionColumn)
.sortWithinPartitions(partitionColumn)
.mapPartitions(
- toManifests(
- tableBroadcast,
- maxNumManifestEntries,
- outputLocation,
- formatVersion,
- combinedPartitionType,
- spec,
- sparkType),
+ toManifests(manifestWriters(), combinedPartitionType,
partitionType, sparkType),
manifestEncoder)
.collectAsList();
});
@@ -319,8 +280,16 @@ public class RewriteManifestsSparkAction
return (int) ((totalSizeBytes + targetManifestSizeBytes - 1) /
targetManifestSizeBytes);
}
- private int targetNumManifestEntries(int numEntries, int numManifests) {
- return (numEntries + numManifests - 1) / numManifests;
+ private long totalSizeBytes(Iterable<ManifestFile> manifests) {
+ long totalSizeBytes = 0L;
+
+ for (ManifestFile manifest : manifests) {
+ ValidationException.check(
+ hasFileCounts(manifest), "No file counts in manifest: %s",
manifest.path());
+ totalSizeBytes += manifest.length();
+ }
+
+ return totalSizeBytes;
}
private boolean hasFileCounts(ManifestFile manifest) {
@@ -360,104 +329,90 @@ public class RewriteManifestsSparkAction
.run(location -> table.io().deleteFile(location));
}
- private static ManifestFile writeManifest(
- List<Row> rows,
- int startIndex,
- int endIndex,
- Broadcast<Table> tableBroadcast,
- String location,
- int format,
- Types.StructType combinedPartitionType,
- PartitionSpec spec,
- StructType sparkType)
- throws IOException {
+ private ManifestWriterFactory manifestWriters() {
+ return new ManifestWriterFactory(
+ sparkContext().broadcast(SerializableTableWithSize.copyOf(table)),
+ formatVersion,
+ spec.specId(),
+ outputLocation,
+ // allow the actual size of manifests to be 20% higher as the
estimation is not precise
+ (long) (1.2 * targetManifestSizeBytes));
+ }
- String manifestName = "optimized-m-" + UUID.randomUUID();
- Path manifestPath = new Path(location, manifestName);
- OutputFile outputFile =
- tableBroadcast
- .value()
- .io()
-
.newOutputFile(FileFormat.AVRO.addExtension(manifestPath.toString()));
+ private static MapPartitionsFunction<Row, ManifestFile> toManifests(
+ ManifestWriterFactory writers,
+ Types.StructType combinedPartitionType,
+ Types.StructType partitionType,
+ StructType sparkType) {
- Types.StructType combinedFileType =
DataFile.getType(combinedPartitionType);
- Types.StructType manifestFileType = DataFile.getType(spec.partitionType());
- SparkDataFile wrapper = new SparkDataFile(combinedFileType,
manifestFileType, sparkType);
+ return rows -> {
+ Types.StructType combinedFileType =
DataFile.getType(combinedPartitionType);
+ Types.StructType manifestFileType = DataFile.getType(partitionType);
+ SparkDataFile wrapper = new SparkDataFile(combinedFileType,
manifestFileType, sparkType);
+
+ RollingManifestWriter<DataFile> writer =
writers.newRollingManifestWriter();
+
+ try {
+ while (rows.hasNext()) {
+ Row row = rows.next();
+ long snapshotId = row.getLong(0);
+ long sequenceNumber = row.getLong(1);
+ Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
+ Row file = row.getStruct(3);
+ writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber,
fileSequenceNumber);
+ }
+ } finally {
+ writer.close();
+ }
- ManifestWriter<DataFile> writer = ManifestFiles.write(format, spec,
outputFile, null);
+ return writer.toManifestFiles().iterator();
+ };
+ }
- try {
- for (int index = startIndex; index < endIndex; index++) {
- Row row = rows.get(index);
- long snapshotId = row.getLong(0);
- long sequenceNumber = row.getLong(1);
- Long fileSequenceNumber = row.isNullAt(2) ? null : row.getLong(2);
- Row file = row.getStruct(3);
- writer.existing(wrapper.wrap(file), snapshotId, sequenceNumber,
fileSequenceNumber);
- }
- } finally {
- writer.close();
+ private static class ManifestWriterFactory implements Serializable {
+ private final Broadcast<Table> tableBroadcast;
+ private final int formatVersion;
+ private final int specId;
+ private final String outputLocation;
+ private final long maxManifestSizeBytes;
+
+ ManifestWriterFactory(
+ Broadcast<Table> tableBroadcast,
+ int formatVersion,
+ int specId,
+ String outputLocation,
+ long maxManifestSizeBytes) {
+ this.tableBroadcast = tableBroadcast;
+ this.formatVersion = formatVersion;
+ this.specId = specId;
+ this.outputLocation = outputLocation;
+ this.maxManifestSizeBytes = maxManifestSizeBytes;
}
- return writer.toManifestFile();
- }
+ public RollingManifestWriter<DataFile> newRollingManifestWriter() {
+ return new RollingManifestWriter<>(this::newManifestWriter,
maxManifestSizeBytes);
+ }
- private static MapPartitionsFunction<Row, ManifestFile> toManifests(
- Broadcast<Table> tableBroadcast,
- long maxNumManifestEntries,
- String location,
- int format,
- Types.StructType combinedPartitionType,
- PartitionSpec spec,
- StructType sparkType) {
+ private ManifestWriter<DataFile> newManifestWriter() {
+ return ManifestFiles.write(formatVersion, spec(), newOutputFile(), null);
+ }
- return rows -> {
- List<Row> rowsAsList = Lists.newArrayList(rows);
+ private PartitionSpec spec() {
+ return table().specs().get(specId);
+ }
- if (rowsAsList.isEmpty()) {
- return Collections.emptyIterator();
- }
+ private OutputFile newOutputFile() {
+ return table().io().newOutputFile(newManifestLocation());
+ }
- List<ManifestFile> manifests = Lists.newArrayList();
- if (rowsAsList.size() <= maxNumManifestEntries) {
- manifests.add(
- writeManifest(
- rowsAsList,
- 0,
- rowsAsList.size(),
- tableBroadcast,
- location,
- format,
- combinedPartitionType,
- spec,
- sparkType));
- } else {
- int midIndex = rowsAsList.size() / 2;
- manifests.add(
- writeManifest(
- rowsAsList,
- 0,
- midIndex,
- tableBroadcast,
- location,
- format,
- combinedPartitionType,
- spec,
- sparkType));
- manifests.add(
- writeManifest(
- rowsAsList,
- midIndex,
- rowsAsList.size(),
- tableBroadcast,
- location,
- format,
- combinedPartitionType,
- spec,
- sparkType));
- }
+ private String newManifestLocation() {
+ String fileName = FileFormat.AVRO.addExtension("optimized-m-" +
UUID.randomUUID());
+ Path filePath = new Path(outputLocation, fileName);
+ return filePath.toString();
+ }
- return manifests.iterator();
- };
+ private Table table() {
+ return tableBroadcast.value();
+ }
}
}
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 4ce5ba4e9d..9ae1954d68 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -34,9 +34,13 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.ManifestWriter;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
@@ -45,6 +49,7 @@ import org.apache.iceberg.TableProperties;
import org.apache.iceberg.actions.RewriteManifests;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -412,16 +417,12 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
options.put(TableProperties.SNAPSHOT_ID_INHERITANCE_ENABLED,
snapshotIdInheritanceEnabled);
Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
- // all records belong to the same partition
- List<ThreeColumnRecord> records = Lists.newArrayList();
- for (int i = 0; i < 50; i++) {
- records.add(new ThreeColumnRecord(i, String.valueOf(i), "0"));
+ List<DataFile> dataFiles = Lists.newArrayList();
+ for (int fileOrdinal = 0; fileOrdinal < 1000; fileOrdinal++) {
+ dataFiles.add(newDataFile(table, "c3=" + fileOrdinal));
}
- Dataset<Row> df = spark.createDataFrame(records, ThreeColumnRecord.class);
- // repartition to create separate files
- writeDF(df.repartition(50, df.col("c1")));
-
- table.refresh();
+ ManifestFile appendManifest = writeManifest(table, dataFiles);
+ table.newFastAppend().appendManifest(appendManifest).commit();
List<ManifestFile> manifests =
table.currentSnapshot().allManifests(table.io());
Assert.assertEquals("Should have 1 manifests before rewrite", 1,
manifests.size());
@@ -446,22 +447,14 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
.stagingLocation(stagingLocation)
.execute();
- Assert.assertEquals(
- "Action should rewrite 1 manifest", 1,
Iterables.size(result.rewrittenManifests()));
- Assert.assertEquals(
- "Action should add 2 manifests", 2,
Iterables.size(result.addedManifests()));
+ Assertions.assertThat(result.rewrittenManifests()).hasSize(1);
+
Assertions.assertThat(result.addedManifests()).hasSizeGreaterThanOrEqualTo(2);
assertManifestsLocation(result.addedManifests(), stagingLocation);
table.refresh();
List<ManifestFile> newManifests =
table.currentSnapshot().allManifests(table.io());
- Assert.assertEquals("Should have 2 manifests after rewrite", 2,
newManifests.size());
-
- Dataset<Row> resultDF = spark.read().format("iceberg").load(tableLocation);
- List<ThreeColumnRecord> actualRecords =
- resultDF.sort("c1",
"c2").as(Encoders.bean(ThreeColumnRecord.class)).collectAsList();
-
- Assert.assertEquals("Rows must match", records, actualRecords);
+ Assertions.assertThat(newManifests).hasSizeGreaterThanOrEqualTo(2);
}
@Test
@@ -643,4 +636,32 @@ public class TestRewriteManifestsAction extends
SparkTestBase {
assertThat(manifests).allMatch(manifest ->
manifest.path().startsWith(tableLocation));
}
}
+
+ private ManifestFile writeManifest(Table table, List<DataFile> files) throws
IOException {
+ File manifestFile = temp.newFile("generated-manifest.avro");
+ Assert.assertTrue(manifestFile.delete());
+ OutputFile outputFile =
table.io().newOutputFile(manifestFile.getCanonicalPath());
+
+ ManifestWriter<DataFile> writer =
+ ManifestFiles.write(formatVersion, table.spec(), outputFile, null);
+
+ try {
+ for (DataFile file : files) {
+ writer.add(file);
+ }
+ } finally {
+ writer.close();
+ }
+
+ return writer.toManifestFile();
+ }
+
+ private DataFile newDataFile(Table table, String partitionPath) {
+ return DataFiles.builder(table.spec())
+ .withPath("/path/to/data-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(10)
+ .withPartitionPath(partitionPath)
+ .withRecordCount(1)
+ .build();
+ }
}