This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 592b3b1c51 Spark 3.5: Preserve content offset and size during manifest 
rewrites (#11469)
592b3b1c51 is described below

commit 592b3b1c51fb0302bc4e970fccfc462a49009ad2
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Nov 5 08:50:39 2024 +0100

    Spark 3.5: Preserve content offset and size during manifest rewrites 
(#11469)
---
 .../org/apache/iceberg/spark/SparkContentFile.java | 18 ++++++++
 .../spark/actions/TestRewriteManifestsAction.java  | 48 ++++++++++++++++++----
 2 files changed, 59 insertions(+), 7 deletions(-)

diff --git 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index af7c4a9b86..bad31d8d85 100644
--- 
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
+++ 
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -55,6 +55,8 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
   private final int fileSpecIdPosition;
   private final int equalityIdsPosition;
   private final int referencedDataFilePosition;
+  private final int contentOffsetPosition;
+  private final int contentSizePosition;
   private final Type lowerBoundsType;
   private final Type upperBoundsType;
   private final Type keyMetadataType;
@@ -105,6 +107,8 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
     this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
     this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
     this.referencedDataFilePosition = 
positions.get(DataFile.REFERENCED_DATA_FILE.name());
+    this.contentOffsetPosition = positions.get(DataFile.CONTENT_OFFSET.name());
+    this.contentSizePosition = positions.get(DataFile.CONTENT_SIZE.name());
   }
 
   public F wrap(Row row) {
@@ -240,6 +244,20 @@ public abstract class SparkContentFile<F> implements 
ContentFile<F> {
     return wrapped.getString(referencedDataFilePosition);
   }
 
+  public Long contentOffset() {
+    if (wrapped.isNullAt(contentOffsetPosition)) {
+      return null;
+    }
+    return wrapped.getLong(contentOffsetPosition);
+  }
+
+  public Long contentSizeInBytes() {
+    if (wrapped.isNullAt(contentSizePosition)) {
+      return null;
+    }
+    return wrapped.getLong(contentSizePosition);
+  }
+
   private int fieldPosition(String name, StructType sparkType) {
     try {
       return sparkType.fieldIndex(name);
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 6cbc53baa3..b86d74415a 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -106,7 +106,8 @@ public class TestRewriteManifestsAction extends TestBase {
       new Object[] {"true", "true", false, 1},
       new Object[] {"false", "true", true, 1},
       new Object[] {"true", "false", false, 2},
-      new Object[] {"false", "false", false, 2}
+      new Object[] {"false", "false", false, 2},
+      new Object[] {"false", "false", false, 3}
     };
   }
 
@@ -150,16 +151,16 @@ public class TestRewriteManifestsAction extends TestBase {
         .appendFile(dataFile3)
         .commit();
 
-    DeleteFile deleteFile1 = newDeleteFileWithRef(table, dataFile1);
-    
assertThat(deleteFile1.referencedDataFile()).isEqualTo(dataFile1.location());
+    DeleteFile deleteFile1 = newDeletes(table, dataFile1);
+    assertDeletes(dataFile1, deleteFile1);
     table.newRowDelta().addDeletes(deleteFile1).commit();
 
-    DeleteFile deleteFile2 = newDeleteFileWithRef(table, dataFile2);
-    
assertThat(deleteFile2.referencedDataFile()).isEqualTo(dataFile2.location());
+    DeleteFile deleteFile2 = newDeletes(table, dataFile2);
+    assertDeletes(dataFile2, deleteFile2);
     table.newRowDelta().addDeletes(deleteFile2).commit();
 
-    DeleteFile deleteFile3 = newDeleteFileWithRef(table, dataFile3);
-    
assertThat(deleteFile3.referencedDataFile()).isEqualTo(dataFile3.location());
+    DeleteFile deleteFile3 = newDeletes(table, dataFile3);
+    assertDeletes(dataFile3, deleteFile3);
     table.newRowDelta().addDeletes(deleteFile3).commit();
 
     SparkActions actions = SparkActions.get();
@@ -178,10 +179,13 @@ public class TestRewriteManifestsAction extends TestBase {
         DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes());
         if (dataFile.location().equals(dataFile1.location())) {
           
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile());
+          assertEqual(deleteFile, deleteFile1);
         } else if (dataFile.location().equals(dataFile2.location())) {
           
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
+          assertEqual(deleteFile, deleteFile2);
         } else {
           
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile3.referencedDataFile());
+          assertEqual(deleteFile, deleteFile3);
         }
       }
     }
@@ -1035,10 +1039,18 @@ public class TestRewriteManifestsAction extends 
TestBase {
         .withRecordCount(1);
   }
 
+  private DeleteFile newDeletes(Table table, DataFile dataFile) {
+    return formatVersion >= 3 ? newDV(table, dataFile) : 
newDeleteFileWithRef(table, dataFile);
+  }
+
   private DeleteFile newDeleteFileWithRef(Table table, DataFile dataFile) {
     return FileGenerationUtil.generatePositionDeleteFileWithRef(table, 
dataFile);
   }
 
+  private DeleteFile newDV(Table table, DataFile dataFile) {
+    return FileGenerationUtil.generateDV(table, dataFile);
+  }
+
   private DeleteFile newDeleteFile(Table table, String partitionPath) {
     return FileMetadata.deleteFileBuilder(table.spec())
         .ofPositionDeletes()
@@ -1097,4 +1109,26 @@ public class TestRewriteManifestsAction extends TestBase 
{
     OutputFile outputFile = Files.localOutput(File.createTempFile("junit", 
null, temp.toFile()));
     return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes, 
deleteSchema);
   }
+
+  private void assertDeletes(DataFile dataFile, DeleteFile deleteFile) {
+    assertThat(deleteFile.referencedDataFile()).isEqualTo(dataFile.location());
+    if (formatVersion >= 3) {
+      assertThat(deleteFile.contentOffset()).isNotNull();
+      assertThat(deleteFile.contentSizeInBytes()).isNotNull();
+    } else {
+      assertThat(deleteFile.contentOffset()).isNull();
+      assertThat(deleteFile.contentSizeInBytes()).isNull();
+    }
+  }
+
+  private void assertEqual(DeleteFile deleteFile1, DeleteFile deleteFile2) {
+    assertThat(deleteFile1.location()).isEqualTo(deleteFile2.location());
+    assertThat(deleteFile1.content()).isEqualTo(deleteFile2.content());
+    assertThat(deleteFile1.specId()).isEqualTo(deleteFile2.specId());
+    assertThat(deleteFile1.partition()).isEqualTo(deleteFile2.partition());
+    assertThat(deleteFile1.format()).isEqualTo(deleteFile2.format());
+    
assertThat(deleteFile1.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
+    
assertThat(deleteFile1.contentOffset()).isEqualTo(deleteFile2.contentOffset());
+    
assertThat(deleteFile1.contentSizeInBytes()).isEqualTo(deleteFile2.contentSizeInBytes());
+  }
 }

Reply via email to