This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new d9b9768766 API, Core: Add data file reference to DeleteFile (#11443)
d9b9768766 is described below
commit d9b9768766b359adf696f5dc9e321507bd0213d2
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Sat Nov 2 17:23:28 2024 +0100
API, Core: Add data file reference to DeleteFile (#11443)
---
api/src/main/java/org/apache/iceberg/DataFile.java | 11 ++++++--
.../main/java/org/apache/iceberg/DeleteFile.java | 11 ++++++++
.../src/main/java/org/apache/iceberg/BaseFile.java | 17 +++++++++++-
.../src/main/java/org/apache/iceberg/BaseScan.java | 1 +
.../java/org/apache/iceberg/ContentFileParser.java | 13 ++++++++-
.../main/java/org/apache/iceberg/FileMetadata.java | 13 ++++++++-
.../java/org/apache/iceberg/GenericDataFile.java | 3 +-
.../java/org/apache/iceberg/GenericDeleteFile.java | 6 ++--
.../java/org/apache/iceberg/SnapshotProducer.java | 5 ++++
.../main/java/org/apache/iceberg/V2Metadata.java | 9 +++++-
.../main/java/org/apache/iceberg/V3Metadata.java | 9 +++++-
.../org/apache/iceberg/util/ContentFileUtil.java | 4 +++
.../src/test/java/org/apache/iceberg/TestBase.java | 12 ++++++++
.../org/apache/iceberg/TestContentFileParser.java | 32 ++++++++++++++++++++--
.../org/apache/iceberg/TestManifestEncryption.java | 3 +-
.../org/apache/iceberg/TestManifestReader.java | 22 +++++++++++++--
.../apache/iceberg/TestManifestWriterVersions.java | 1 +
17 files changed, 157 insertions(+), 15 deletions(-)
diff --git a/api/src/main/java/org/apache/iceberg/DataFile.java
b/api/src/main/java/org/apache/iceberg/DataFile.java
index 02ad0aff31..3c6d77f34d 100644
--- a/api/src/main/java/org/apache/iceberg/DataFile.java
+++ b/api/src/main/java/org/apache/iceberg/DataFile.java
@@ -98,12 +98,18 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField SORT_ORDER_ID =
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(),
"Partition spec ID");
+ Types.NestedField REFERENCED_DATA_FILE =
+ optional(
+ 143,
+ "referenced_data_file",
+ StringType.get(),
+ "Fully qualified location (URI with FS scheme) of a data file that
all deletes reference");
int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition
spec";
- // NEXT ID TO ASSIGN: 142
+ // NEXT ID TO ASSIGN: 144
static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
@@ -124,7 +130,8 @@ public interface DataFile extends ContentFile<DataFile> {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
- SORT_ORDER_ID);
+ SORT_ORDER_ID,
+ REFERENCED_DATA_FILE);
}
/**
diff --git a/api/src/main/java/org/apache/iceberg/DeleteFile.java
b/api/src/main/java/org/apache/iceberg/DeleteFile.java
index 0f8087e6a0..8e17e60fcc 100644
--- a/api/src/main/java/org/apache/iceberg/DeleteFile.java
+++ b/api/src/main/java/org/apache/iceberg/DeleteFile.java
@@ -31,4 +31,15 @@ public interface DeleteFile extends ContentFile<DeleteFile> {
default List<Long> splitOffsets() {
return null;
}
+
+ /**
+ * Returns the location of a data file that all deletes reference.
+ *
+ * <p>The referenced data file is required for deletion vectors and may be
optionally captured for
+ * position delete files that apply to only one data file. This method
always returns null for
+ * equality delete files.
+ */
+ default String referencedDataFile() {
+ return null;
+ }
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseFile.java
b/core/src/main/java/org/apache/iceberg/BaseFile.java
index 8f84eb5737..f4fd94724e 100644
--- a/core/src/main/java/org/apache/iceberg/BaseFile.java
+++ b/core/src/main/java/org/apache/iceberg/BaseFile.java
@@ -80,6 +80,7 @@ abstract class BaseFile<F> extends SupportsIndexProjection
private int[] equalityIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;
+ private String referencedDataFile = null;
// cached schema
private transient Schema avroSchema = null;
@@ -108,6 +109,7 @@ abstract class BaseFile<F> extends SupportsIndexProjection
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID,
+ DataFile.REFERENCED_DATA_FILE,
MetadataColumns.ROW_POSITION);
/** Used by Avro reflection to instantiate this class when reading manifest
files. */
@@ -149,7 +151,8 @@ abstract class BaseFile<F> extends SupportsIndexProjection
List<Long> splitOffsets,
int[] equalityFieldIds,
Integer sortOrderId,
- ByteBuffer keyMetadata) {
+ ByteBuffer keyMetadata,
+ String referencedDataFile) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
@@ -178,6 +181,7 @@ abstract class BaseFile<F> extends SupportsIndexProjection
this.equalityIds = equalityFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
+ this.referencedDataFile = referencedDataFile;
}
/**
@@ -230,6 +234,7 @@ abstract class BaseFile<F> extends SupportsIndexProjection
this.sortOrderId = toCopy.sortOrderId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
+ this.referencedDataFile = toCopy.referencedDataFile;
}
/** Constructor for Java serialization. */
@@ -339,6 +344,9 @@ abstract class BaseFile<F> extends SupportsIndexProjection
this.sortOrderId = (Integer) value;
return;
case 17:
+ this.referencedDataFile = value != null ? value.toString() : null;
+ return;
+ case 18:
this.fileOrdinal = (long) value;
return;
default:
@@ -388,6 +396,8 @@ abstract class BaseFile<F> extends SupportsIndexProjection
case 16:
return sortOrderId;
case 17:
+ return referencedDataFile;
+ case 18:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " +
basePos);
@@ -514,6 +524,10 @@ abstract class BaseFile<F> extends SupportsIndexProjection
return sortOrderId;
}
+ public String referencedDataFile() {
+ return referencedDataFile;
+ }
+
private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) :
SerializableMap.filteredCopyOf(map, keys);
}
@@ -565,6 +579,7 @@ abstract class BaseFile<F> extends SupportsIndexProjection
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" :
dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" :
fileSequenceNumber)
+ .add("referenced_data_file", referencedDataFile == null ? "null" :
referencedDataFile)
.toString();
}
}
diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java
b/core/src/main/java/org/apache/iceberg/BaseScan.java
index 804df01d31..a011d03d59 100644
--- a/core/src/main/java/org/apache/iceberg/BaseScan.java
+++ b/core/src/main/java/org/apache/iceberg/BaseScan.java
@@ -77,6 +77,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends
ScanTaskGroup<T>>
"partition",
"key_metadata",
"split_offsets",
+ "referenced_data_file",
"equality_ids");
protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
diff --git a/core/src/main/java/org/apache/iceberg/ContentFileParser.java
b/core/src/main/java/org/apache/iceberg/ContentFileParser.java
index dd08c5c69e..96dfa5586c 100644
--- a/core/src/main/java/org/apache/iceberg/ContentFileParser.java
+++ b/core/src/main/java/org/apache/iceberg/ContentFileParser.java
@@ -45,6 +45,7 @@ class ContentFileParser {
private static final String SPLIT_OFFSETS = "split-offsets";
private static final String EQUALITY_IDS = "equality-ids";
private static final String SORT_ORDER_ID = "sort-order-id";
+ private static final String REFERENCED_DATA_FILE = "referenced-data-file";
private ContentFileParser() {}
@@ -109,6 +110,14 @@ class ContentFileParser {
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
}
+ if (contentFile instanceof DeleteFile) {
+ DeleteFile deleteFile = (DeleteFile) contentFile;
+
+ if (deleteFile.referencedDataFile() != null) {
+ generator.writeStringField(REFERENCED_DATA_FILE,
deleteFile.referencedDataFile());
+ }
+ }
+
generator.writeEndObject();
}
@@ -145,6 +154,7 @@ class ContentFileParser {
List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS,
jsonNode);
int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS,
jsonNode);
Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);
+ String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE,
jsonNode);
if (fileContent == FileContent.DATA) {
return new GenericDataFile(
@@ -169,7 +179,8 @@ class ContentFileParser {
equalityFieldIds,
sortOrderId,
splitOffsets,
- keyMetadata);
+ keyMetadata,
+ referencedDataFile);
}
}
diff --git a/core/src/main/java/org/apache/iceberg/FileMetadata.java
b/core/src/main/java/org/apache/iceberg/FileMetadata.java
index 9a201d1b3b..ef229593bc 100644
--- a/core/src/main/java/org/apache/iceberg/FileMetadata.java
+++ b/core/src/main/java/org/apache/iceberg/FileMetadata.java
@@ -59,6 +59,7 @@ public class FileMetadata {
private ByteBuffer keyMetadata = null;
private Integer sortOrderId = null;
private List<Long> splitOffsets = null;
+ private String referencedDataFile = null;
Builder(PartitionSpec spec) {
this.spec = spec;
@@ -220,6 +221,15 @@ public class FileMetadata {
return this;
}
+ public Builder withReferencedDataFile(CharSequence newReferencedDataFile) {
+ if (newReferencedDataFile != null) {
+ this.referencedDataFile = newReferencedDataFile.toString();
+ } else {
+ this.referencedDataFile = null;
+ }
+ return this;
+ }
+
public DeleteFile build() {
Preconditions.checkArgument(filePath != null, "File path is required");
if (format == null) {
@@ -262,7 +272,8 @@ public class FileMetadata {
equalityFieldIds,
sortOrderId,
splitOffsets,
- keyMetadata);
+ keyMetadata,
+ referencedDataFile);
}
}
}
diff --git a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
index 7b99e7b60a..aa34cd22cd 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDataFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDataFile.java
@@ -64,7 +64,8 @@ class GenericDataFile extends BaseFile<DataFile> implements
DataFile {
splitOffsets,
null /* no equality field IDs */,
sortOrderId,
- keyMetadata);
+ keyMetadata,
+ null /* no referenced data file */);
}
/**
diff --git a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
index 77e0d8505a..05eb7c97db 100644
--- a/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
+++ b/core/src/main/java/org/apache/iceberg/GenericDeleteFile.java
@@ -48,7 +48,8 @@ class GenericDeleteFile extends BaseFile<DeleteFile>
implements DeleteFile {
int[] equalityFieldIds,
Integer sortOrderId,
List<Long> splitOffsets,
- ByteBuffer keyMetadata) {
+ ByteBuffer keyMetadata,
+ String referencedDataFile) {
super(
specId,
content,
@@ -66,7 +67,8 @@ class GenericDeleteFile extends BaseFile<DeleteFile>
implements DeleteFile {
splitOffsets,
equalityFieldIds,
sortOrderId,
- keyMetadata);
+ keyMetadata,
+ referencedDataFile);
}
/**
diff --git a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
index 89f9eab719..daf1c3d72b 100644
--- a/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
+++ b/core/src/main/java/org/apache/iceberg/SnapshotProducer.java
@@ -923,5 +923,10 @@ abstract class SnapshotProducer<ThisT> implements
SnapshotUpdate<ThisT> {
public Integer sortOrderId() {
return deleteFile.sortOrderId();
}
+
+ @Override
+ public String referencedDataFile() {
+ return deleteFile.referencedDataFile();
+ }
}
}
diff --git a/core/src/main/java/org/apache/iceberg/V2Metadata.java
b/core/src/main/java/org/apache/iceberg/V2Metadata.java
index be4c3734e4..20b2169b8d 100644
--- a/core/src/main/java/org/apache/iceberg/V2Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V2Metadata.java
@@ -274,7 +274,8 @@ class V2Metadata {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
- DataFile.SORT_ORDER_ID);
+ DataFile.SORT_ORDER_ID,
+ DataFile.REFERENCED_DATA_FILE);
}
static class IndexedManifestEntry<F extends ContentFile<F>>
@@ -448,6 +449,12 @@ class V2Metadata {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
+ case 16:
+ if (wrapped instanceof DeleteFile) {
+ return ((DeleteFile) wrapped).referencedDataFile();
+ } else {
+ return null;
+ }
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
diff --git a/core/src/main/java/org/apache/iceberg/V3Metadata.java
b/core/src/main/java/org/apache/iceberg/V3Metadata.java
index f295af3e10..a418a86856 100644
--- a/core/src/main/java/org/apache/iceberg/V3Metadata.java
+++ b/core/src/main/java/org/apache/iceberg/V3Metadata.java
@@ -274,7 +274,8 @@ class V3Metadata {
DataFile.KEY_METADATA,
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
- DataFile.SORT_ORDER_ID);
+ DataFile.SORT_ORDER_ID,
+ DataFile.REFERENCED_DATA_FILE);
}
static class IndexedManifestEntry<F extends ContentFile<F>>
@@ -448,6 +449,12 @@ class V3Metadata {
return wrapped.equalityFieldIds();
case 15:
return wrapped.sortOrderId();
+ case 16:
+ if (wrapped.content() == FileContent.POSITION_DELETES) {
+ return ((DeleteFile) wrapped).referencedDataFile();
+ } else {
+ return null;
+ }
}
throw new IllegalArgumentException("Unknown field ordinal: " + pos);
}
diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
index 04fc077d10..c82b3ff828 100644
--- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
@@ -54,6 +54,10 @@ public class ContentFileUtil {
return null;
}
+ if (deleteFile.referencedDataFile() != null) {
+ return deleteFile.referencedDataFile();
+ }
+
int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
diff --git a/core/src/test/java/org/apache/iceberg/TestBase.java
b/core/src/test/java/org/apache/iceberg/TestBase.java
index f3bbb79795..4544163190 100644
--- a/core/src/test/java/org/apache/iceberg/TestBase.java
+++ b/core/src/test/java/org/apache/iceberg/TestBase.java
@@ -654,6 +654,18 @@ public class TestBase {
.build();
}
+ protected DeleteFile newDeleteFileWithRef(DataFile dataFile) {
+ PartitionSpec spec = table.specs().get(dataFile.specId());
+ return FileMetadata.deleteFileBuilder(spec)
+ .ofPositionDeletes()
+ .withPath("/path/to/delete-" + UUID.randomUUID() + ".parquet")
+ .withFileSizeInBytes(10)
+ .withPartition(dataFile.partition())
+ .withReferencedDataFile(dataFile.location())
+ .withRecordCount(1)
+ .build();
+ }
+
protected DeleteFile newEqualityDeleteFile(int specId, String partitionPath,
int... fieldIds) {
PartitionSpec spec = table.specs().get(specId);
return FileMetadata.deleteFileBuilder(spec)
diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
index 83f7fc1f62..fbe4739316 100644
--- a/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
+++ b/core/src/test/java/org/apache/iceberg/TestContentFileParser.java
@@ -213,7 +213,33 @@ public class TestContentFileParser {
Arguments.of(
TestBase.SPEC,
deleteFileWithAllOptional(TestBase.SPEC),
- deleteFileJsonWithAllOptional(TestBase.SPEC)));
+ deleteFileJsonWithAllOptional(TestBase.SPEC)),
+ Arguments.of(
+ TestBase.SPEC, deleteFileWithDataRef(TestBase.SPEC),
deleteFileWithDataRefJson()));
+ }
+
+ private static DeleteFile deleteFileWithDataRef(PartitionSpec spec) {
+ PartitionData partitionData = new PartitionData(spec.partitionType());
+ partitionData.set(0, 4);
+ return new GenericDeleteFile(
+ spec.specId(),
+ FileContent.POSITION_DELETES,
+ "/path/to/delete.parquet",
+ FileFormat.PARQUET,
+ partitionData,
+ 1234,
+ new Metrics(10L, null, null, null, null),
+ null,
+ null,
+ null,
+ null,
+ "/path/to/data/file.parquet");
+ }
+
+ private static String deleteFileWithDataRefJson() {
+ return
"{\"spec-id\":0,\"content\":\"POSITION_DELETES\",\"file-path\":\"/path/to/delete.parquet\","
+ +
"\"file-format\":\"PARQUET\",\"partition\":{\"1000\":4},\"file-size-in-bytes\":1234,"
+ +
"\"record-count\":10,\"referenced-data-file\":\"/path/to/data/file.parquet\"}";
}
private static DeleteFile deleteFileWithRequiredOnly(PartitionSpec spec) {
@@ -234,6 +260,7 @@ public class TestContentFileParser {
null,
null,
null,
+ null,
null);
}
@@ -273,7 +300,8 @@ public class TestContentFileParser {
new int[] {3},
1,
Collections.singletonList(128L),
- ByteBuffer.wrap(new byte[16]));
+ ByteBuffer.wrap(new byte[16]),
+ null);
}
private static String deleteFileJsonWithRequiredOnly(PartitionSpec spec) {
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
index 13e8985cdb..1f29c0e5b8 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestEncryption.java
@@ -110,7 +110,8 @@ public class TestManifestEncryption {
EQUALITY_ID_ARR,
SORT_ORDER_ID,
null,
- CONTENT_KEY_METADATA);
+ CONTENT_KEY_METADATA,
+ null);
private static final EncryptionManager ENCRYPTION_MANAGER =
EncryptionTestHelpers.createEncryptionManager();
diff --git a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
index e45415f1f2..4652da9430 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestReader.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestReader.java
@@ -130,7 +130,7 @@ public class TestManifestReader extends TestBase {
long expectedPos = 0L;
for (DataFile file : reader) {
assertThat(file.pos()).as("Position should
match").isEqualTo(expectedPos);
- assertThat(((BaseFile) file).get(17))
+ assertThat(((BaseFile) file).get(18))
.as("Position from field index should match")
.isEqualTo(expectedPos);
expectedPos += 1;
@@ -158,7 +158,7 @@ public class TestManifestReader extends TestBase {
long expectedPos = 0L;
for (DeleteFile file : reader) {
assertThat(file.pos()).as("Position should
match").isEqualTo(expectedPos);
- assertThat(((BaseFile) file).get(17))
+ assertThat(((BaseFile) file).get(18))
.as("Position from field index should match")
.isEqualTo(expectedPos);
expectedPos += 1;
@@ -181,6 +181,24 @@ public class TestManifestReader extends TestBase {
}
}
+ @TestTemplate
+ public void testDeleteFilesWithReferences() throws IOException {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(2);
+ DeleteFile deleteFile1 = newDeleteFileWithRef(FILE_A);
+ DeleteFile deleteFile2 = newDeleteFileWithRef(FILE_B);
+ ManifestFile manifest = writeDeleteManifest(formatVersion, 1000L,
deleteFile1, deleteFile2);
+ try (ManifestReader<DeleteFile> reader =
+ ManifestFiles.readDeleteManifest(manifest, FILE_IO, table.specs())) {
+ for (DeleteFile deleteFile : reader) {
+ if (deleteFile.location().equals(deleteFile1.location())) {
+
assertThat(deleteFile.referencedDataFile()).isEqualTo(FILE_A.location());
+ } else {
+
assertThat(deleteFile.referencedDataFile()).isEqualTo(FILE_B.location());
+ }
+ }
+ }
+ }
+
@TestTemplate
public void testDataFileSplitOffsetsNullWhenInvalid() throws IOException {
DataFile invalidOffset =
diff --git
a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
index 1d5c34fa4b..88dcc6ff9c 100644
--- a/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
+++ b/core/src/test/java/org/apache/iceberg/TestManifestWriterVersions.java
@@ -96,6 +96,7 @@ public class TestManifestWriterVersions {
EQUALITY_ID_ARR,
SORT_ORDER_ID,
null,
+ null,
null);
@TempDir private Path temp;