This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d0cca384a0 Spark 3.5: Preserve data file reference during manifest
rewrites (#11457)
d0cca384a0 is described below
commit d0cca384a01172b5133bf7e207d94e374ed0c2ed
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Nov 4 21:22:50 2024 +0100
Spark 3.5: Preserve data file reference during manifest rewrites (#11457)
---
.../org/apache/iceberg/FileGenerationUtil.java | 17 ++++++
.../org/apache/iceberg/spark/SparkContentFile.java | 9 ++++
.../spark/actions/TestRewriteManifestsAction.java | 63 ++++++++++++++++++++++
3 files changed, 89 insertions(+)
diff --git a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
index b210cfcd4f..4f85151c80 100644
--- a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
+++ b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
@@ -136,6 +136,23 @@ public class FileGenerationUtil {
.build();
}
+ public static DeleteFile generatePositionDeleteFileWithRef(Table table,
DataFile dataFile) {
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ StructLike partition = dataFile.partition();
+ LocationProvider locations = table.locationProvider();
+ String path = locations.newDataLocation(spec, partition,
generateFileName());
+ long fileSize = generateFileSize();
+ return FileMetadata.deleteFileBuilder(spec)
+ .ofPositionDeletes()
+ .withPath(path)
+ .withPartition(partition)
+ .withFileSizeInBytes(fileSize)
+ .withFormat(FileFormat.PARQUET)
+ .withReferencedDataFile(dataFile.location())
+ .withRecordCount(3)
+ .build();
+ }
+
// mimics the behavior of OutputFileFactory
public static String generateFileName() {
int partitionId = random().nextInt(100_000);
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index 99586f2503..af7c4a9b86 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -54,6 +54,7 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
private final int sortOrderIdPosition;
private final int fileSpecIdPosition;
private final int equalityIdsPosition;
+ private final int referencedDataFilePosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
@@ -103,6 +104,7 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
+ this.referencedDataFilePosition =
positions.get(DataFile.REFERENCED_DATA_FILE.name());
}
public F wrap(Row row) {
@@ -231,6 +233,13 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
return wrapped.isNullAt(equalityIdsPosition) ? null :
wrapped.getList(equalityIdsPosition);
}
+ public String referencedDataFile() {
+ if (wrapped.isNullAt(referencedDataFilePosition)) {
+ return null;
+ }
+ return wrapped.getString(referencedDataFilePosition);
+ }
+
private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index a449de414a..6cbc53baa3 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -41,7 +41,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileGenerationUtil;
import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.Files;
import org.apache.iceberg.ManifestContent;
import org.apache.iceberg.ManifestFile;
@@ -64,6 +66,7 @@ import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.exceptions.CommitStateUnknownException;
import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.OutputFile;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -128,6 +131,62 @@ public class TestRewriteManifestsAction extends TestBase {
this.tableLocation = tableDir.toURI().toString();
}
+ @TestTemplate
+ public void testRewriteManifestsPreservesOptionalFields() throws IOException
{
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+
+ PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+ Map<String, String> options = Maps.newHashMap();
+ options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+ Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+ DataFile dataFile1 = newDataFile(table, "c1=0");
+ DataFile dataFile2 = newDataFile(table, "c1=0");
+ DataFile dataFile3 = newDataFile(table, "c1=0");
+ table
+ .newFastAppend()
+ .appendFile(dataFile1)
+ .appendFile(dataFile2)
+ .appendFile(dataFile3)
+ .commit();
+
+ DeleteFile deleteFile1 = newDeleteFileWithRef(table, dataFile1);
+
assertThat(deleteFile1.referencedDataFile()).isEqualTo(dataFile1.location());
+ table.newRowDelta().addDeletes(deleteFile1).commit();
+
+ DeleteFile deleteFile2 = newDeleteFileWithRef(table, dataFile2);
+
assertThat(deleteFile2.referencedDataFile()).isEqualTo(dataFile2.location());
+ table.newRowDelta().addDeletes(deleteFile2).commit();
+
+ DeleteFile deleteFile3 = newDeleteFileWithRef(table, dataFile3);
+
assertThat(deleteFile3.referencedDataFile()).isEqualTo(dataFile3.location());
+ table.newRowDelta().addDeletes(deleteFile3).commit();
+
+ SparkActions actions = SparkActions.get();
+
+ actions
+ .rewriteManifests(table)
+ .rewriteIf(manifest -> true)
+ .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+ .execute();
+
+ table.refresh();
+
+ try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+ for (FileScanTask fileTask : tasks) {
+ DataFile dataFile = fileTask.file();
+ DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes());
+ if (dataFile.location().equals(dataFile1.location())) {
+
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile());
+ } else if (dataFile.location().equals(dataFile2.location())) {
+
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
+ } else {
+
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile3.referencedDataFile());
+ }
+ }
+ }
+ }
+
@TestTemplate
public void testRewriteManifestsEmptyTable() throws IOException {
PartitionSpec spec = PartitionSpec.unpartitioned();
@@ -976,6 +1035,10 @@ public class TestRewriteManifestsAction extends TestBase {
.withRecordCount(1);
}
+ private DeleteFile newDeleteFileWithRef(Table table, DataFile dataFile) {
+ return FileGenerationUtil.generatePositionDeleteFileWithRef(table,
dataFile);
+ }
+
private DeleteFile newDeleteFile(Table table, String partitionPath) {
return FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()