This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 808794bcb4 Core: Ability to build DeleteFileIndex from files (#8172)
808794bcb4 is described below
commit 808794bcb4924212e545c92dee5ec033a431167f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Jul 28 19:40:43 2023 -0700
Core: Ability to build DeleteFileIndex from files (#8172)
---
.../java/org/apache/iceberg/DeleteFileIndex.java | 35 +++++-
.../org/apache/iceberg/TestDeleteFileIndex.java | 121 ++++++++++++++-------
2 files changed, 117 insertions(+), 39 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 9d54348439..977b2a387d 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -23,6 +23,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -49,6 +50,7 @@ import
org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
@@ -74,6 +76,8 @@ class DeleteFileIndex {
private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
deletesByPartition;
private final boolean isEmpty;
+ /** @deprecated since 1.4.0, will be removed in 1.5.0. */
+ @Deprecated
DeleteFileIndex(
Map<Integer, PartitionSpec> specs,
long[] globalSeqs,
@@ -366,9 +370,14 @@ class DeleteFileIndex {
return new Builder(io, Sets.newHashSet(deleteManifests));
}
+ static Builder builderFor(Iterable<DeleteFile> deleteFiles) {
+ return new Builder(deleteFiles);
+ }
+
static class Builder {
private final FileIO io;
private final Set<ManifestFile> deleteManifests;
+ private final Iterable<DeleteFile> deleteFiles;
private long minSequenceNumber = 0L;
private Map<Integer, PartitionSpec> specsById = null;
private Expression dataFilter = Expressions.alwaysTrue();
@@ -381,6 +390,13 @@ class DeleteFileIndex {
Builder(FileIO io, Set<ManifestFile> deleteManifests) {
this.io = io;
this.deleteManifests = Sets.newHashSet(deleteManifests);
+ this.deleteFiles = null;
+ }
+
+ Builder(Iterable<DeleteFile> deleteFiles) {
+ this.io = null;
+ this.deleteManifests = null;
+ this.deleteFiles = deleteFiles;
}
Builder afterSequenceNumber(long seq) {
@@ -394,16 +410,22 @@ class DeleteFileIndex {
}
Builder filterData(Expression newDataFilter) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
data filters");
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}
Builder filterPartitions(Expression newPartitionFilter) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
partition filters");
this.partitionFilter = Expressions.and(partitionFilter,
newPartitionFilter);
return this;
}
Builder filterPartitions(PartitionSet newPartitionSet) {
+ Preconditions.checkArgument(
+ deleteFiles == null, "Index constructed from files does not support
partition filters");
this.partitionSet = newPartitionSet;
return this;
}
@@ -423,7 +445,13 @@ class DeleteFileIndex {
return this;
}
- DeleteFileIndex build() {
+ private Collection<DeleteFile> filterDeleteFiles() {
+ return Streams.stream(deleteFiles)
+ .filter(file -> file.dataSequenceNumber() > minSequenceNumber)
+ .collect(Collectors.toList());
+ }
+
+ private Collection<DeleteFile> loadDeleteFiles() {
// read all of the matching delete manifests in parallel and accumulate
the matching files in
// a queue
Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
@@ -444,6 +472,11 @@ class DeleteFileIndex {
throw new RuntimeIOException(e, "Failed to close");
}
});
+ return files;
+ }
+
+ DeleteFileIndex build() {
+ Collection<DeleteFile> files = deleteFiles != null ? filterDeleteFiles()
: loadDeleteFiles();
// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
index 95ce34ab04..4e574690f7 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
@@ -25,12 +25,11 @@ import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.UUID;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.StructLikeWrapper;
import org.junit.Assert;
import org.junit.Test;
@@ -47,7 +46,6 @@ public class TestDeleteFileIndex extends TableTestBase {
.withPartition(FILE_A.partition())
.withRecordCount(1)
.build();
- static final DeleteFile FILE_A_POS_2 = FILE_A_POS_1.copy();
static final DeleteFile FILE_A_EQ_1 =
FileMetadata.deleteFileBuilder(SPEC)
@@ -57,9 +55,6 @@ public class TestDeleteFileIndex extends TableTestBase {
.withPartition(FILE_A.partition())
.withRecordCount(1)
.build();
- static final DeleteFile FILE_A_EQ_2 = FILE_A_EQ_1.copy();
- static final DeleteFile[] DELETE_FILES =
- new DeleteFile[] {FILE_A_POS_1, FILE_A_EQ_1, FILE_A_POS_2, FILE_A_EQ_2};
private static DataFile unpartitionedFile(PartitionSpec spec) {
return DataFiles.builder(spec)
@@ -72,7 +67,17 @@ public class TestDeleteFileIndex extends TableTestBase {
private static DeleteFile unpartitionedPosDeletes(PartitionSpec spec) {
return FileMetadata.deleteFileBuilder(spec)
.ofPositionDeletes()
- .withPath("/path/to/data-unpartitioned-pos-deletes.parquet")
+ .withPath(UUID.randomUUID() +
"/path/to/data-unpartitioned-pos-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ }
+
+ private static DeleteFile partitionedPosDeletes(PartitionSpec spec,
StructLike partition) {
+ return FileMetadata.deleteFileBuilder(spec)
+ .ofPositionDeletes()
+ .withPartition(partition)
+ .withPath(UUID.randomUUID() +
"/path/to/data-partitioned-pos-deletes.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
@@ -81,38 +86,81 @@ public class TestDeleteFileIndex extends TableTestBase {
private static DeleteFile unpartitionedEqDeletes(PartitionSpec spec) {
return FileMetadata.deleteFileBuilder(spec)
.ofEqualityDeletes()
- .withPath("/path/to/data-unpartitioned-eq-deletes.parquet")
+ .withPath(UUID.randomUUID() +
"/path/to/data-unpartitioned-eq-deletes.parquet")
+ .withFileSizeInBytes(10)
+ .withRecordCount(1)
+ .build();
+ }
+
+ private static DeleteFile partitionedEqDeletes(PartitionSpec spec,
StructLike partition) {
+ return FileMetadata.deleteFileBuilder(spec)
+ .ofEqualityDeletes()
+ .withPartition(partition)
+ .withPath(UUID.randomUUID() +
"/path/to/data-partitioned-eq-deletes.parquet")
.withFileSizeInBytes(10)
.withRecordCount(1)
.build();
}
+ @SuppressWarnings("unchecked")
+ private static <F extends ContentFile<F>> F withDataSequenceNumber(long seq,
F file) {
+ BaseFile<F> baseFile = (BaseFile<F>) file;
+ baseFile.setDataSequenceNumber(seq);
+ return file;
+ }
+
+ @Test
+ public void testMinSequenceNumberFilteringForFiles() {
+ PartitionSpec partSpec = PartitionSpec.unpartitioned();
+
+ DeleteFile[] deleteFiles = {
+ withDataSequenceNumber(4, unpartitionedEqDeletes(partSpec)),
+ withDataSequenceNumber(6, unpartitionedEqDeletes(partSpec))
+ };
+
+ DeleteFileIndex index =
+ DeleteFileIndex.builderFor(Arrays.asList(deleteFiles))
+ .specsById(ImmutableMap.of(partSpec.specId(), partSpec, 1, SPEC))
+ .afterSequenceNumber(4)
+ .build();
+
+ DataFile file = unpartitionedFile(partSpec);
+
+ Assert.assertEquals("Only one delete file should apply", 1,
index.forDataFile(0, file).length);
+ }
+
@Test
public void testUnpartitionedDeletes() {
PartitionSpec partSpec = PartitionSpec.unpartitioned();
+
+ DeleteFile[] deleteFiles = {
+ withDataSequenceNumber(4, unpartitionedEqDeletes(partSpec)),
+ withDataSequenceNumber(6, unpartitionedEqDeletes(partSpec)),
+ withDataSequenceNumber(5, unpartitionedPosDeletes(partSpec)),
+ withDataSequenceNumber(6, unpartitionedPosDeletes(partSpec))
+ };
+
DeleteFileIndex index =
- new DeleteFileIndex(
- ImmutableMap.of(partSpec.specId(), partSpec, 1, SPEC),
- new long[] {3, 5, 5, 6},
- DELETE_FILES,
- ImmutableMap.of());
+ DeleteFileIndex.builderFor(Arrays.asList(deleteFiles))
+ .specsById(ImmutableMap.of(partSpec.specId(), partSpec, 1, SPEC))
+ .build();
DataFile unpartitionedFile = unpartitionedFile(partSpec);
Assert.assertArrayEquals(
- "All deletes should apply to seq 0", DELETE_FILES,
index.forDataFile(0, unpartitionedFile));
+ "All deletes should apply to seq 0", deleteFiles, index.forDataFile(0,
unpartitionedFile));
Assert.assertArrayEquals(
- "All deletes should apply to seq 3", DELETE_FILES,
index.forDataFile(3, unpartitionedFile));
+ "All deletes should apply to seq 3", deleteFiles, index.forDataFile(3,
unpartitionedFile));
Assert.assertArrayEquals(
"Last 3 deletes should apply to seq 4",
- Arrays.copyOfRange(DELETE_FILES, 1, 4),
+ Arrays.copyOfRange(deleteFiles, 1, 4),
index.forDataFile(4, unpartitionedFile));
Assert.assertArrayEquals(
"Last 3 deletes should apply to seq 5",
- Arrays.copyOfRange(DELETE_FILES, 1, 4),
+ Arrays.copyOfRange(deleteFiles, 1, 4),
index.forDataFile(5, unpartitionedFile));
Assert.assertArrayEquals(
"Last delete should apply to seq 6",
- Arrays.copyOfRange(DELETE_FILES, 3, 4),
+ Arrays.copyOfRange(deleteFiles, 3, 4),
index.forDataFile(6, unpartitionedFile));
Assert.assertArrayEquals(
"No deletes should apply to seq 7",
@@ -127,43 +175,40 @@ public class TestDeleteFileIndex extends TableTestBase {
DataFile partitionedFileA = FILE_A.copy();
((BaseFile<?>) partitionedFileA).setSpecId(1);
Assert.assertArrayEquals(
- "All global deletes should apply to a partitioned file",
- DELETE_FILES,
+ "All global equality deletes should apply to a partitioned file",
+ Arrays.copyOfRange(deleteFiles, 0, 2),
index.forDataFile(0, partitionedFileA));
}
@Test
public void testPartitionedDeleteIndex() {
+ DeleteFile[] deleteFiles = {
+ withDataSequenceNumber(4, partitionedEqDeletes(SPEC,
FILE_A.partition())),
+ withDataSequenceNumber(6, partitionedEqDeletes(SPEC,
FILE_A.partition())),
+ withDataSequenceNumber(5, partitionedPosDeletes(SPEC,
FILE_A.partition())),
+ withDataSequenceNumber(6, partitionedPosDeletes(SPEC,
FILE_A.partition()))
+ };
+
DeleteFileIndex index =
- new DeleteFileIndex(
- ImmutableMap.of(SPEC.specId(), SPEC, 1,
PartitionSpec.unpartitioned()),
- null,
- null,
- ImmutableMap.of(
- Pair.of(
- SPEC.specId(),
-
StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_A.partition())),
- Pair.of(new long[] {3, 5, 5, 6}, DELETE_FILES),
- Pair.of(
- SPEC.specId(),
-
StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_C.partition())),
- Pair.of(new long[0], new DeleteFile[0])));
+ DeleteFileIndex.builderFor(Arrays.asList(deleteFiles))
+ .specsById(ImmutableMap.of(SPEC.specId(), SPEC, 1,
PartitionSpec.unpartitioned()))
+ .build();
Assert.assertArrayEquals(
- "All deletes should apply to seq 0", DELETE_FILES,
index.forDataFile(0, FILE_A));
+ "All deletes should apply to seq 0", deleteFiles, index.forDataFile(0,
FILE_A));
Assert.assertArrayEquals(
- "All deletes should apply to seq 3", DELETE_FILES,
index.forDataFile(3, FILE_A));
+ "All deletes should apply to seq 3", deleteFiles, index.forDataFile(3,
FILE_A));
Assert.assertArrayEquals(
"Last 3 deletes should apply to seq 4",
- Arrays.copyOfRange(DELETE_FILES, 1, 4),
+ Arrays.copyOfRange(deleteFiles, 1, 4),
index.forDataFile(4, FILE_A));
Assert.assertArrayEquals(
"Last 3 deletes should apply to seq 5",
- Arrays.copyOfRange(DELETE_FILES, 1, 4),
+ Arrays.copyOfRange(deleteFiles, 1, 4),
index.forDataFile(5, FILE_A));
Assert.assertArrayEquals(
"Last delete should apply to seq 6",
- Arrays.copyOfRange(DELETE_FILES, 3, 4),
+ Arrays.copyOfRange(deleteFiles, 3, 4),
index.forDataFile(6, FILE_A));
Assert.assertArrayEquals(
"No deletes should apply to seq 7", new DataFile[0],
index.forDataFile(7, FILE_A));