This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 5bd314bdf6 Core: Support DVs in DeleteFileIndex (#11467)
5bd314bdf6 is described below
commit 5bd314bdf6c3b5e0e5346d0f7408353bdf31bc81
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Tue Nov 5 15:52:24 2024 +0100
Core: Support DVs in DeleteFileIndex (#11467)
---
.../java/org/apache/iceberg/DeleteFileIndex.java | 62 +++++++++++++++++++---
.../org/apache/iceberg/util/ContentFileUtil.java | 14 +++++
.../apache/iceberg/DeleteFileIndexTestBase.java | 52 ++++++++++++++++++
3 files changed, 121 insertions(+), 7 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 8444b91eec..ab7fec6fb1 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -33,6 +33,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import org.apache.iceberg.exceptions.RuntimeIOException;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.expressions.ManifestEvaluator;
@@ -70,6 +71,7 @@ class DeleteFileIndex {
private final PartitionMap<EqualityDeletes> eqDeletesByPartition;
private final PartitionMap<PositionDeletes> posDeletesByPartition;
private final Map<String, PositionDeletes> posDeletesByPath;
+ private final Map<String, DeleteFile> dvByPath;
private final boolean hasEqDeletes;
private final boolean hasPosDeletes;
private final boolean isEmpty;
@@ -78,13 +80,16 @@ class DeleteFileIndex {
EqualityDeletes globalDeletes,
PartitionMap<EqualityDeletes> eqDeletesByPartition,
PartitionMap<PositionDeletes> posDeletesByPartition,
- Map<String, PositionDeletes> posDeletesByPath) {
+ Map<String, PositionDeletes> posDeletesByPath,
+ Map<String, DeleteFile> dvByPath) {
this.globalDeletes = globalDeletes;
this.eqDeletesByPartition = eqDeletesByPartition;
this.posDeletesByPartition = posDeletesByPartition;
this.posDeletesByPath = posDeletesByPath;
+ this.dvByPath = dvByPath;
this.hasEqDeletes = globalDeletes != null || eqDeletesByPartition != null;
- this.hasPosDeletes = posDeletesByPartition != null || posDeletesByPath !=
null;
+ this.hasPosDeletes =
+ posDeletesByPartition != null || posDeletesByPath != null || dvByPath
!= null;
this.isEmpty = !hasEqDeletes && !hasPosDeletes;
}
@@ -125,6 +130,10 @@ class DeleteFileIndex {
}
}
+ if (dvByPath != null) {
+ deleteFiles = Iterables.concat(deleteFiles, dvByPath.values());
+ }
+
return deleteFiles;
}
@@ -143,9 +152,16 @@ class DeleteFileIndex {
DeleteFile[] global = findGlobalDeletes(sequenceNumber, file);
DeleteFile[] eqPartition = findEqPartitionDeletes(sequenceNumber, file);
- DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber, file);
- DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
- return concat(global, eqPartition, posPartition, posPath);
+ DeleteFile dv = findDV(sequenceNumber, file);
+ if (dv != null && global == null && eqPartition == null) {
+ return new DeleteFile[] {dv};
+ } else if (dv != null) {
+ return concat(global, eqPartition, new DeleteFile[] {dv});
+ } else {
+ DeleteFile[] posPartition = findPosPartitionDeletes(sequenceNumber,
file);
+ DeleteFile[] posPath = findPathDeletes(sequenceNumber, file);
+ return concat(global, eqPartition, posPartition, posPath);
+ }
}
private DeleteFile[] findGlobalDeletes(long seq, DataFile dataFile) {
@@ -180,6 +196,22 @@ class DeleteFileIndex {
return deletes == null ? EMPTY_DELETES : deletes.filter(seq);
}
+ private DeleteFile findDV(long seq, DataFile dataFile) {
+ if (dvByPath == null) {
+ return null;
+ }
+
+ DeleteFile dv = dvByPath.get(dataFile.location());
+ if (dv != null) {
+ ValidationException.check(
+ dv.dataSequenceNumber() >= seq,
+ "DV data sequence number (%s) must be greater than or equal to data
file sequence number (%s)",
+ dv.dataSequenceNumber(),
+ seq);
+ }
+ return dv;
+ }
+
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean canContainEqDeletesForFile(
DataFile dataFile, EqualityDeleteFile deleteFile) {
@@ -434,11 +466,16 @@ class DeleteFileIndex {
PartitionMap<EqualityDeletes> eqDeletesByPartition =
PartitionMap.create(specsById);
PartitionMap<PositionDeletes> posDeletesByPartition =
PartitionMap.create(specsById);
Map<String, PositionDeletes> posDeletesByPath = Maps.newHashMap();
+ Map<String, DeleteFile> dvByPath = Maps.newHashMap();
for (DeleteFile file : files) {
switch (file.content()) {
case POSITION_DELETES:
- add(posDeletesByPath, posDeletesByPartition, file);
+ if (ContentFileUtil.isDV(file)) {
+ add(dvByPath, file);
+ } else {
+ add(posDeletesByPath, posDeletesByPartition, file);
+ }
break;
case EQUALITY_DELETES:
add(globalDeletes, eqDeletesByPartition, file);
@@ -453,7 +490,18 @@ class DeleteFileIndex {
globalDeletes.isEmpty() ? null : globalDeletes,
eqDeletesByPartition.isEmpty() ? null : eqDeletesByPartition,
posDeletesByPartition.isEmpty() ? null : posDeletesByPartition,
- posDeletesByPath.isEmpty() ? null : posDeletesByPath);
+ posDeletesByPath.isEmpty() ? null : posDeletesByPath,
+ dvByPath.isEmpty() ? null : dvByPath);
+ }
+
+ private void add(Map<String, DeleteFile> dvByPath, DeleteFile dv) {
+ String path = dv.referencedDataFile();
+ DeleteFile existingDV = dvByPath.putIfAbsent(path, dv);
+ if (existingDV != null) {
+ throw new ValidationException(
+ "Can't index multiple DVs for %s: %s and %s",
+ path, ContentFileUtil.dvDesc(dv),
ContentFileUtil.dvDesc(existingDV));
+ }
}
private void add(
diff --git a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
index c82b3ff828..e4666bd1bd 100644
--- a/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
+++ b/core/src/main/java/org/apache/iceberg/util/ContentFileUtil.java
@@ -24,6 +24,7 @@ import java.util.Set;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
@@ -84,4 +85,17 @@ public class ContentFileUtil {
CharSequence location = referencedDataFile(deleteFile);
return location != null ? location.toString() : null;
}
+
+ public static boolean isDV(DeleteFile deleteFile) {
+ return deleteFile.format() == FileFormat.PUFFIN;
+ }
+
+ public static String dvDesc(DeleteFile deleteFile) {
+ return String.format(
+ "DV{location=%s, offset=%s, length=%s, referencedDataFile=%s}",
+ deleteFile.location(),
+ deleteFile.contentOffset(),
+ deleteFile.contentSizeInBytes(),
+ deleteFile.referencedDataFile());
+ }
}
diff --git a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
index de7e59ac17..6ef28191e7 100644
--- a/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
+++ b/core/src/test/java/org/apache/iceberg/DeleteFileIndexTestBase.java
@@ -22,20 +22,24 @@ import static
org.apache.iceberg.expressions.Expressions.bucket;
import static org.apache.iceberg.expressions.Expressions.equal;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assumptions.assumeThat;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.UUID;
import org.apache.iceberg.DeleteFileIndex.EqualityDeletes;
import org.apache.iceberg.DeleteFileIndex.PositionDeletes;
+import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
+import org.apache.iceberg.util.ContentFileUtil;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
@@ -621,4 +625,52 @@ public abstract class DeleteFileIndexTestBase<
// it should not be possible to add more elements upon indexing
assertThatThrownBy(() -> group.add(SPEC,
file1)).isInstanceOf(IllegalStateException.class);
}
+
+ @TestTemplate
+ public void testMixDeleteFilesAndDVs() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ List<DeleteFile> deletes =
+ Arrays.asList(
+ withDataSequenceNumber(1, partitionedPosDeletes(SPEC,
FILE_A.partition())),
+ withDataSequenceNumber(2, newDV(FILE_A)),
+ withDataSequenceNumber(1, partitionedPosDeletes(SPEC,
FILE_B.partition())),
+ withDataSequenceNumber(2, partitionedPosDeletes(SPEC,
FILE_B.partition())));
+
+ DeleteFileIndex index =
DeleteFileIndex.builderFor(deletes).specsById(table.specs()).build();
+
+ DeleteFile[] fileADeletes = index.forDataFile(0, FILE_A);
+ assertThat(fileADeletes).as("Only DV should apply to FILE_A").hasSize(1);
+ assertThat(ContentFileUtil.isDV(fileADeletes[0])).isTrue();
+
assertThat(fileADeletes[0].referencedDataFile()).isEqualTo(FILE_A.location());
+
+ DeleteFile[] fileBDeletes = index.forDataFile(0, FILE_B);
+ assertThat(fileBDeletes).as("Two delete files should apply to
FILE_B").hasSize(2);
+ assertThat(ContentFileUtil.isDV(fileBDeletes[0])).isFalse();
+ assertThat(ContentFileUtil.isDV(fileBDeletes[1])).isFalse();
+ }
+
+ @TestTemplate
+ public void testMultipleDVs() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+
+ DeleteFile dv1 = withDataSequenceNumber(1, newDV(FILE_A));
+ DeleteFile dv2 = withDataSequenceNumber(2, newDV(FILE_A));
+ List<DeleteFile> dvs = Arrays.asList(dv1, dv2);
+
+ assertThatThrownBy(() ->
DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build())
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("Can't index multiple DVs for %s",
FILE_A.location());
+ }
+
+ @TestTemplate
+ public void testInvalidDVSequenceNumber() {
+ assumeThat(formatVersion).isGreaterThanOrEqualTo(3);
+ DeleteFile dv = withDataSequenceNumber(1, newDV(FILE_A));
+ List<DeleteFile> dvs = Collections.singletonList(dv);
+ DeleteFileIndex index =
DeleteFileIndex.builderFor(dvs).specsById(table.specs()).build();
+ assertThatThrownBy(() -> index.forDataFile(2, FILE_A))
+ .isInstanceOf(ValidationException.class)
+ .hasMessageContaining("must be greater than or equal to data file
sequence number");
+ }
}