This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new d0cca384a0 Spark 3.5: Preserve data file reference during manifest 
rewrites (#11457)
d0cca384a0 is described below

commit d0cca384a01172b5133bf7e207d94e374ed0c2ed
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Mon Nov 4 21:22:50 2024 +0100

    Spark 3.5: Preserve data file reference during manifest rewrites (#11457)
---
 .../org/apache/iceberg/FileGenerationUtil.java     | 17 ++++++
 .../org/apache/iceberg/spark/SparkContentFile.java |  9 ++++
 .../spark/actions/TestRewriteManifestsAction.java  | 63 ++++++++++++++++++++++
 3 files changed, 89 insertions(+)

diff --git a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java 
b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
index b210cfcd4f..4f85151c80 100644
--- a/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
+++ b/core/src/test/java/org/apache/iceberg/FileGenerationUtil.java
@@ -136,6 +136,23 @@ public class FileGenerationUtil {
         .build();
   }
 
+  public static DeleteFile generatePositionDeleteFileWithRef(Table table, 
DataFile dataFile) {
+    PartitionSpec spec = table.specs().get(dataFile.specId());
+    StructLike partition = dataFile.partition();
+    LocationProvider locations = table.locationProvider();
+    String path = locations.newDataLocation(spec, partition, 
generateFileName());
+    long fileSize = generateFileSize();
+    return FileMetadata.deleteFileBuilder(spec)
+        .ofPositionDeletes()
+        .withPath(path)
+        .withPartition(partition)
+        .withFileSizeInBytes(fileSize)
+        .withFormat(FileFormat.PARQUET)
+        .withReferencedDataFile(dataFile.location())
+        .withRecordCount(3)
+        .build();
+  }
+
   // mimics the behavior of OutputFileFactory
   public static String generateFileName() {
     int partitionId = random().nextInt(100_000);
diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index 99586f2503..af7c4a9b86 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -54,6 +54,7 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
   private final int sortOrderIdPosition;
   private final int fileSpecIdPosition;
   private final int equalityIdsPosition;
+  private final int referencedDataFilePosition;
   private final Type lowerBoundsType;
   private final Type upperBoundsType;
   private final Type keyMetadataType;
@@ -103,6 +104,7 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
     this.sortOrderIdPosition = positions.get(DataFile.SORT_ORDER_ID.name());
     this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
     this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
+    this.referencedDataFilePosition = 
positions.get(DataFile.REFERENCED_DATA_FILE.name());
   }
 
   public F wrap(Row row) {
@@ -231,6 +233,13 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
     return wrapped.isNullAt(equalityIdsPosition) ? null : 
wrapped.getList(equalityIdsPosition);
   }
 
+  public String referencedDataFile() {
+    if (wrapped.isNullAt(referencedDataFilePosition)) {
+      return null;
+    }
+    return wrapped.getString(referencedDataFilePosition);
+  }
+
   private int fieldPosition(String name, StructType sparkType) {
     try {
       return sparkType.fieldIndex(name);
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index a449de414a..6cbc53baa3 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -41,7 +41,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.DataFiles;
 import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileGenerationUtil;
 import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.FileScanTask;
 import org.apache.iceberg.Files;
 import org.apache.iceberg.ManifestContent;
 import org.apache.iceberg.ManifestFile;
@@ -64,6 +66,7 @@ import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.exceptions.CommitStateUnknownException;
 import org.apache.iceberg.hadoop.HadoopTables;
+import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.io.OutputFile;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
@@ -128,6 +131,62 @@ public class TestRewriteManifestsAction extends TestBase {
     this.tableLocation = tableDir.toURI().toString();
   }
 
+  @TestTemplate
+  public void testRewriteManifestsPreservesOptionalFields() throws IOException 
{
+    assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+
+    PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("c1").build();
+    Map<String, String> options = Maps.newHashMap();
+    options.put(TableProperties.FORMAT_VERSION, String.valueOf(formatVersion));
+    Table table = TABLES.create(SCHEMA, spec, options, tableLocation);
+
+    DataFile dataFile1 = newDataFile(table, "c1=0");
+    DataFile dataFile2 = newDataFile(table, "c1=0");
+    DataFile dataFile3 = newDataFile(table, "c1=0");
+    table
+        .newFastAppend()
+        .appendFile(dataFile1)
+        .appendFile(dataFile2)
+        .appendFile(dataFile3)
+        .commit();
+
+    DeleteFile deleteFile1 = newDeleteFileWithRef(table, dataFile1);
+    
assertThat(deleteFile1.referencedDataFile()).isEqualTo(dataFile1.location());
+    table.newRowDelta().addDeletes(deleteFile1).commit();
+
+    DeleteFile deleteFile2 = newDeleteFileWithRef(table, dataFile2);
+    
assertThat(deleteFile2.referencedDataFile()).isEqualTo(dataFile2.location());
+    table.newRowDelta().addDeletes(deleteFile2).commit();
+
+    DeleteFile deleteFile3 = newDeleteFileWithRef(table, dataFile3);
+    
assertThat(deleteFile3.referencedDataFile()).isEqualTo(dataFile3.location());
+    table.newRowDelta().addDeletes(deleteFile3).commit();
+
+    SparkActions actions = SparkActions.get();
+
+    actions
+        .rewriteManifests(table)
+        .rewriteIf(manifest -> true)
+        .option(RewriteManifestsSparkAction.USE_CACHING, useCaching)
+        .execute();
+
+    table.refresh();
+
+    try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
+      for (FileScanTask fileTask : tasks) {
+        DataFile dataFile = fileTask.file();
+        DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes());
+        if (dataFile.location().equals(dataFile1.location())) {
+          
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile());
+        } else if (dataFile.location().equals(dataFile2.location())) {
+          
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
+        } else {
+          
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile3.referencedDataFile());
+        }
+      }
+    }
+  }
+
   @TestTemplate
   public void testRewriteManifestsEmptyTable() throws IOException {
     PartitionSpec spec = PartitionSpec.unpartitioned();
@@ -976,6 +1035,10 @@ public class TestRewriteManifestsAction extends TestBase {
         .withRecordCount(1);
   }
 
+  private DeleteFile newDeleteFileWithRef(Table table, DataFile dataFile) {
+    return FileGenerationUtil.generatePositionDeleteFileWithRef(table, 
dataFile);
+  }
+
   private DeleteFile newDeleteFile(Table table, String partitionPath) {
     return FileMetadata.deleteFileBuilder(table.spec())
         .ofPositionDeletes()

Reply via email to