This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 592b3b1c51 Spark 3.5: Preserve content offset and size during manifest
rewrites (#11469)
592b3b1c51 is described below
commit 592b3b1c51fb0302bc4e970fccfc462a49009ad2
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Nov 5 08:50:39 2024 +0100
Spark 3.5: Preserve content offset and size during manifest rewrites
(#11469)
---
.../org/apache/iceberg/spark/SparkContentFile.java | 18 ++++++++
.../spark/actions/TestRewriteManifestsAction.java | 48 ++++++++++++++++++----
2 files changed, 59 insertions(+), 7 deletions(-)
diff --git
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
index af7c4a9b86..bad31d8d85 100644
---
a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
+++
b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkContentFile.java
@@ -55,6 +55,8 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
private final int fileSpecIdPosition;
private final int equalityIdsPosition;
private final int referencedDataFilePosition;
+ private final int contentOffsetPosition;
+ private final int contentSizePosition;
private final Type lowerBoundsType;
private final Type upperBoundsType;
private final Type keyMetadataType;
@@ -105,6 +107,8 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
this.fileSpecIdPosition = positions.get(DataFile.SPEC_ID.name());
this.equalityIdsPosition = positions.get(DataFile.EQUALITY_IDS.name());
this.referencedDataFilePosition =
positions.get(DataFile.REFERENCED_DATA_FILE.name());
+ this.contentOffsetPosition = positions.get(DataFile.CONTENT_OFFSET.name());
+ this.contentSizePosition = positions.get(DataFile.CONTENT_SIZE.name());
}
public F wrap(Row row) {
@@ -240,6 +244,20 @@ public abstract class SparkContentFile<F> implements
ContentFile<F> {
return wrapped.getString(referencedDataFilePosition);
}
+ public Long contentOffset() {
+ if (wrapped.isNullAt(contentOffsetPosition)) {
+ return null;
+ }
+ return wrapped.getLong(contentOffsetPosition);
+ }
+
+ public Long contentSizeInBytes() {
+ if (wrapped.isNullAt(contentSizePosition)) {
+ return null;
+ }
+ return wrapped.getLong(contentSizePosition);
+ }
+
private int fieldPosition(String name, StructType sparkType) {
try {
return sparkType.fieldIndex(name);
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
index 6cbc53baa3..b86d74415a 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/actions/TestRewriteManifestsAction.java
@@ -106,7 +106,8 @@ public class TestRewriteManifestsAction extends TestBase {
new Object[] {"true", "true", false, 1},
new Object[] {"false", "true", true, 1},
new Object[] {"true", "false", false, 2},
- new Object[] {"false", "false", false, 2}
+ new Object[] {"false", "false", false, 2},
+ new Object[] {"false", "false", false, 3}
};
}
@@ -150,16 +151,16 @@ public class TestRewriteManifestsAction extends TestBase {
.appendFile(dataFile3)
.commit();
- DeleteFile deleteFile1 = newDeleteFileWithRef(table, dataFile1);
-
assertThat(deleteFile1.referencedDataFile()).isEqualTo(dataFile1.location());
+ DeleteFile deleteFile1 = newDeletes(table, dataFile1);
+ assertDeletes(dataFile1, deleteFile1);
table.newRowDelta().addDeletes(deleteFile1).commit();
- DeleteFile deleteFile2 = newDeleteFileWithRef(table, dataFile2);
-
assertThat(deleteFile2.referencedDataFile()).isEqualTo(dataFile2.location());
+ DeleteFile deleteFile2 = newDeletes(table, dataFile2);
+ assertDeletes(dataFile2, deleteFile2);
table.newRowDelta().addDeletes(deleteFile2).commit();
- DeleteFile deleteFile3 = newDeleteFileWithRef(table, dataFile3);
-
assertThat(deleteFile3.referencedDataFile()).isEqualTo(dataFile3.location());
+ DeleteFile deleteFile3 = newDeletes(table, dataFile3);
+ assertDeletes(dataFile3, deleteFile3);
table.newRowDelta().addDeletes(deleteFile3).commit();
SparkActions actions = SparkActions.get();
@@ -178,10 +179,13 @@ public class TestRewriteManifestsAction extends TestBase {
DeleteFile deleteFile = Iterables.getOnlyElement(fileTask.deletes());
if (dataFile.location().equals(dataFile1.location())) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile1.referencedDataFile());
+ assertEqual(deleteFile, deleteFile1);
} else if (dataFile.location().equals(dataFile2.location())) {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
+ assertEqual(deleteFile, deleteFile2);
} else {
assertThat(deleteFile.referencedDataFile()).isEqualTo(deleteFile3.referencedDataFile());
+ assertEqual(deleteFile, deleteFile3);
}
}
}
@@ -1035,10 +1039,18 @@ public class TestRewriteManifestsAction extends
TestBase {
.withRecordCount(1);
}
+ private DeleteFile newDeletes(Table table, DataFile dataFile) {
+ return formatVersion >= 3 ? newDV(table, dataFile) :
newDeleteFileWithRef(table, dataFile);
+ }
+
private DeleteFile newDeleteFileWithRef(Table table, DataFile dataFile) {
return FileGenerationUtil.generatePositionDeleteFileWithRef(table,
dataFile);
}
+ private DeleteFile newDV(Table table, DataFile dataFile) {
+ return FileGenerationUtil.generateDV(table, dataFile);
+ }
+
private DeleteFile newDeleteFile(Table table, String partitionPath) {
return FileMetadata.deleteFileBuilder(table.spec())
.ofPositionDeletes()
@@ -1097,4 +1109,26 @@ public class TestRewriteManifestsAction extends TestBase
{
OutputFile outputFile = Files.localOutput(File.createTempFile("junit",
null, temp.toFile()));
return FileHelpers.writeDeleteFile(table, outputFile, partition, deletes,
deleteSchema);
}
+
+ private void assertDeletes(DataFile dataFile, DeleteFile deleteFile) {
+ assertThat(deleteFile.referencedDataFile()).isEqualTo(dataFile.location());
+ if (formatVersion >= 3) {
+ assertThat(deleteFile.contentOffset()).isNotNull();
+ assertThat(deleteFile.contentSizeInBytes()).isNotNull();
+ } else {
+ assertThat(deleteFile.contentOffset()).isNull();
+ assertThat(deleteFile.contentSizeInBytes()).isNull();
+ }
+ }
+
+ private void assertEqual(DeleteFile deleteFile1, DeleteFile deleteFile2) {
+ assertThat(deleteFile1.location()).isEqualTo(deleteFile2.location());
+ assertThat(deleteFile1.content()).isEqualTo(deleteFile2.content());
+ assertThat(deleteFile1.specId()).isEqualTo(deleteFile2.specId());
+ assertThat(deleteFile1.partition()).isEqualTo(deleteFile2.partition());
+ assertThat(deleteFile1.format()).isEqualTo(deleteFile2.format());
+
assertThat(deleteFile1.referencedDataFile()).isEqualTo(deleteFile2.referencedDataFile());
+
assertThat(deleteFile1.contentOffset()).isEqualTo(deleteFile2.contentOffset());
+
assertThat(deleteFile1.contentSizeInBytes()).isEqualTo(deleteFile2.contentSizeInBytes());
+ }
}