This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 808794bcb4 Core: Ability to build DeleteFileIndex from files (#8172)
808794bcb4 is described below

commit 808794bcb4924212e545c92dee5ec033a431167f
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Fri Jul 28 19:40:43 2023 -0700

    Core: Ability to build DeleteFileIndex from files (#8172)
---
 .../java/org/apache/iceberg/DeleteFileIndex.java   |  35 +++++-
 .../org/apache/iceberg/TestDeleteFileIndex.java    | 121 ++++++++++++++-------
 2 files changed, 117 insertions(+), 39 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index 9d54348439..977b2a387d 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -23,6 +23,7 @@ import com.github.benmanes.caffeine.cache.LoadingCache;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -49,6 +50,7 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Maps;
 import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
+import org.apache.iceberg.relocated.com.google.common.collect.Streams;
 import org.apache.iceberg.types.Comparators;
 import org.apache.iceberg.types.Conversions;
 import org.apache.iceberg.types.Type;
@@ -74,6 +76,8 @@ class DeleteFileIndex {
   private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
deletesByPartition;
   private final boolean isEmpty;
 
+  /** @deprecated since 1.4.0, will be removed in 1.5.0. */
+  @Deprecated
   DeleteFileIndex(
       Map<Integer, PartitionSpec> specs,
       long[] globalSeqs,
@@ -366,9 +370,14 @@ class DeleteFileIndex {
     return new Builder(io, Sets.newHashSet(deleteManifests));
   }
 
+  static Builder builderFor(Iterable<DeleteFile> deleteFiles) {
+    return new Builder(deleteFiles);
+  }
+
   static class Builder {
     private final FileIO io;
     private final Set<ManifestFile> deleteManifests;
+    private final Iterable<DeleteFile> deleteFiles;
     private long minSequenceNumber = 0L;
     private Map<Integer, PartitionSpec> specsById = null;
     private Expression dataFilter = Expressions.alwaysTrue();
@@ -381,6 +390,13 @@ class DeleteFileIndex {
     Builder(FileIO io, Set<ManifestFile> deleteManifests) {
       this.io = io;
       this.deleteManifests = Sets.newHashSet(deleteManifests);
+      this.deleteFiles = null;
+    }
+
+    Builder(Iterable<DeleteFile> deleteFiles) {
+      this.io = null;
+      this.deleteManifests = null;
+      this.deleteFiles = deleteFiles;
     }
 
     Builder afterSequenceNumber(long seq) {
@@ -394,16 +410,22 @@ class DeleteFileIndex {
     }
 
     Builder filterData(Expression newDataFilter) {
+      Preconditions.checkArgument(
+          deleteFiles == null, "Index constructed from files does not support 
data filters");
       this.dataFilter = Expressions.and(dataFilter, newDataFilter);
       return this;
     }
 
     Builder filterPartitions(Expression newPartitionFilter) {
+      Preconditions.checkArgument(
+          deleteFiles == null, "Index constructed from files does not support 
partition filters");
       this.partitionFilter = Expressions.and(partitionFilter, 
newPartitionFilter);
       return this;
     }
 
     Builder filterPartitions(PartitionSet newPartitionSet) {
+      Preconditions.checkArgument(
+          deleteFiles == null, "Index constructed from files does not support 
partition filters");
       this.partitionSet = newPartitionSet;
       return this;
     }
@@ -423,7 +445,13 @@ class DeleteFileIndex {
       return this;
     }
 
-    DeleteFileIndex build() {
+    private Collection<DeleteFile> filterDeleteFiles() {
+      return Streams.stream(deleteFiles)
+          .filter(file -> file.dataSequenceNumber() > minSequenceNumber)
+          .collect(Collectors.toList());
+    }
+
+    private Collection<DeleteFile> loadDeleteFiles() {
       // read all of the matching delete manifests in parallel and accumulate 
the matching files in
       // a queue
       Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
@@ -444,6 +472,11 @@ class DeleteFileIndex {
                   throw new RuntimeIOException(e, "Failed to close");
                 }
               });
+      return files;
+    }
+
+    DeleteFileIndex build() {
+      Collection<DeleteFile> files = deleteFiles != null ? filterDeleteFiles() 
: loadDeleteFiles();
 
       // build a map from (specId, partition) to delete file entries
       Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
diff --git a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java 
b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
index 95ce34ab04..4e574690f7 100644
--- a/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
+++ b/core/src/test/java/org/apache/iceberg/TestDeleteFileIndex.java
@@ -25,12 +25,11 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
+import java.util.UUID;
 import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
 import org.apache.iceberg.relocated.com.google.common.collect.Sets;
-import org.apache.iceberg.util.Pair;
-import org.apache.iceberg.util.StructLikeWrapper;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -47,7 +46,6 @@ public class TestDeleteFileIndex extends TableTestBase {
           .withPartition(FILE_A.partition())
           .withRecordCount(1)
           .build();
-  static final DeleteFile FILE_A_POS_2 = FILE_A_POS_1.copy();
 
   static final DeleteFile FILE_A_EQ_1 =
       FileMetadata.deleteFileBuilder(SPEC)
@@ -57,9 +55,6 @@ public class TestDeleteFileIndex extends TableTestBase {
           .withPartition(FILE_A.partition())
           .withRecordCount(1)
           .build();
-  static final DeleteFile FILE_A_EQ_2 = FILE_A_EQ_1.copy();
-  static final DeleteFile[] DELETE_FILES =
-      new DeleteFile[] {FILE_A_POS_1, FILE_A_EQ_1, FILE_A_POS_2, FILE_A_EQ_2};
 
   private static DataFile unpartitionedFile(PartitionSpec spec) {
     return DataFiles.builder(spec)
@@ -72,7 +67,17 @@ public class TestDeleteFileIndex extends TableTestBase {
   private static DeleteFile unpartitionedPosDeletes(PartitionSpec spec) {
     return FileMetadata.deleteFileBuilder(spec)
         .ofPositionDeletes()
-        .withPath("/path/to/data-unpartitioned-pos-deletes.parquet")
+        .withPath(UUID.randomUUID() + 
"/path/to/data-unpartitioned-pos-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+  }
+
+  private static DeleteFile partitionedPosDeletes(PartitionSpec spec, 
StructLike partition) {
+    return FileMetadata.deleteFileBuilder(spec)
+        .ofPositionDeletes()
+        .withPartition(partition)
+        .withPath(UUID.randomUUID() + 
"/path/to/data-partitioned-pos-deletes.parquet")
         .withFileSizeInBytes(10)
         .withRecordCount(1)
         .build();
@@ -81,38 +86,81 @@ public class TestDeleteFileIndex extends TableTestBase {
   private static DeleteFile unpartitionedEqDeletes(PartitionSpec spec) {
     return FileMetadata.deleteFileBuilder(spec)
         .ofEqualityDeletes()
-        .withPath("/path/to/data-unpartitioned-eq-deletes.parquet")
+        .withPath(UUID.randomUUID() + 
"/path/to/data-unpartitioned-eq-deletes.parquet")
+        .withFileSizeInBytes(10)
+        .withRecordCount(1)
+        .build();
+  }
+
+  private static DeleteFile partitionedEqDeletes(PartitionSpec spec, 
StructLike partition) {
+    return FileMetadata.deleteFileBuilder(spec)
+        .ofEqualityDeletes()
+        .withPartition(partition)
+        .withPath(UUID.randomUUID() + 
"/path/to/data-partitioned-eq-deletes.parquet")
         .withFileSizeInBytes(10)
         .withRecordCount(1)
         .build();
   }
 
+  @SuppressWarnings("unchecked")
+  private static <F extends ContentFile<F>> F withDataSequenceNumber(long seq, 
F file) {
+    BaseFile<F> baseFile = (BaseFile<F>) file;
+    baseFile.setDataSequenceNumber(seq);
+    return file;
+  }
+
+  @Test
+  public void testMinSequenceNumberFilteringForFiles() {
+    PartitionSpec partSpec = PartitionSpec.unpartitioned();
+
+    DeleteFile[] deleteFiles = {
+      withDataSequenceNumber(4, unpartitionedEqDeletes(partSpec)),
+      withDataSequenceNumber(6, unpartitionedEqDeletes(partSpec))
+    };
+
+    DeleteFileIndex index =
+        DeleteFileIndex.builderFor(Arrays.asList(deleteFiles))
+            .specsById(ImmutableMap.of(partSpec.specId(), partSpec, 1, SPEC))
+            .afterSequenceNumber(4)
+            .build();
+
+    DataFile file = unpartitionedFile(partSpec);
+
+    Assert.assertEquals("Only one delete file should apply", 1, 
index.forDataFile(0, file).length);
+  }
+
   @Test
   public void testUnpartitionedDeletes() {
     PartitionSpec partSpec = PartitionSpec.unpartitioned();
+
+    DeleteFile[] deleteFiles = {
+      withDataSequenceNumber(4, unpartitionedEqDeletes(partSpec)),
+      withDataSequenceNumber(6, unpartitionedEqDeletes(partSpec)),
+      withDataSequenceNumber(5, unpartitionedPosDeletes(partSpec)),
+      withDataSequenceNumber(6, unpartitionedPosDeletes(partSpec))
+    };
+
     DeleteFileIndex index =
-        new DeleteFileIndex(
-            ImmutableMap.of(partSpec.specId(), partSpec, 1, SPEC),
-            new long[] {3, 5, 5, 6},
-            DELETE_FILES,
-            ImmutableMap.of());
+        DeleteFileIndex.builderFor(Arrays.asList(deleteFiles))
+            .specsById(ImmutableMap.of(partSpec.specId(), partSpec, 1, SPEC))
+            .build();
 
     DataFile unpartitionedFile = unpartitionedFile(partSpec);
     Assert.assertArrayEquals(
-        "All deletes should apply to seq 0", DELETE_FILES, 
index.forDataFile(0, unpartitionedFile));
+        "All deletes should apply to seq 0", deleteFiles, index.forDataFile(0, 
unpartitionedFile));
     Assert.assertArrayEquals(
-        "All deletes should apply to seq 3", DELETE_FILES, 
index.forDataFile(3, unpartitionedFile));
+        "All deletes should apply to seq 3", deleteFiles, index.forDataFile(3, 
unpartitionedFile));
     Assert.assertArrayEquals(
         "Last 3 deletes should apply to seq 4",
-        Arrays.copyOfRange(DELETE_FILES, 1, 4),
+        Arrays.copyOfRange(deleteFiles, 1, 4),
         index.forDataFile(4, unpartitionedFile));
     Assert.assertArrayEquals(
         "Last 3 deletes should apply to seq 5",
-        Arrays.copyOfRange(DELETE_FILES, 1, 4),
+        Arrays.copyOfRange(deleteFiles, 1, 4),
         index.forDataFile(5, unpartitionedFile));
     Assert.assertArrayEquals(
         "Last delete should apply to seq 6",
-        Arrays.copyOfRange(DELETE_FILES, 3, 4),
+        Arrays.copyOfRange(deleteFiles, 3, 4),
         index.forDataFile(6, unpartitionedFile));
     Assert.assertArrayEquals(
         "No deletes should apply to seq 7",
@@ -127,43 +175,40 @@ public class TestDeleteFileIndex extends TableTestBase {
     DataFile partitionedFileA = FILE_A.copy();
     ((BaseFile<?>) partitionedFileA).setSpecId(1);
     Assert.assertArrayEquals(
-        "All global deletes should apply to a partitioned file",
-        DELETE_FILES,
+        "All global equality deletes should apply to a partitioned file",
+        Arrays.copyOfRange(deleteFiles, 0, 2),
         index.forDataFile(0, partitionedFileA));
   }
 
   @Test
   public void testPartitionedDeleteIndex() {
+    DeleteFile[] deleteFiles = {
+      withDataSequenceNumber(4, partitionedEqDeletes(SPEC, 
FILE_A.partition())),
+      withDataSequenceNumber(6, partitionedEqDeletes(SPEC, 
FILE_A.partition())),
+      withDataSequenceNumber(5, partitionedPosDeletes(SPEC, 
FILE_A.partition())),
+      withDataSequenceNumber(6, partitionedPosDeletes(SPEC, 
FILE_A.partition()))
+    };
+
     DeleteFileIndex index =
-        new DeleteFileIndex(
-            ImmutableMap.of(SPEC.specId(), SPEC, 1, 
PartitionSpec.unpartitioned()),
-            null,
-            null,
-            ImmutableMap.of(
-                Pair.of(
-                    SPEC.specId(),
-                    
StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_A.partition())),
-                Pair.of(new long[] {3, 5, 5, 6}, DELETE_FILES),
-                Pair.of(
-                    SPEC.specId(),
-                    
StructLikeWrapper.forType(SPEC.partitionType()).set(FILE_C.partition())),
-                Pair.of(new long[0], new DeleteFile[0])));
+        DeleteFileIndex.builderFor(Arrays.asList(deleteFiles))
+            .specsById(ImmutableMap.of(SPEC.specId(), SPEC, 1, 
PartitionSpec.unpartitioned()))
+            .build();
 
     Assert.assertArrayEquals(
-        "All deletes should apply to seq 0", DELETE_FILES, 
index.forDataFile(0, FILE_A));
+        "All deletes should apply to seq 0", deleteFiles, index.forDataFile(0, 
FILE_A));
     Assert.assertArrayEquals(
-        "All deletes should apply to seq 3", DELETE_FILES, 
index.forDataFile(3, FILE_A));
+        "All deletes should apply to seq 3", deleteFiles, index.forDataFile(3, 
FILE_A));
     Assert.assertArrayEquals(
         "Last 3 deletes should apply to seq 4",
-        Arrays.copyOfRange(DELETE_FILES, 1, 4),
+        Arrays.copyOfRange(deleteFiles, 1, 4),
         index.forDataFile(4, FILE_A));
     Assert.assertArrayEquals(
         "Last 3 deletes should apply to seq 5",
-        Arrays.copyOfRange(DELETE_FILES, 1, 4),
+        Arrays.copyOfRange(deleteFiles, 1, 4),
         index.forDataFile(5, FILE_A));
     Assert.assertArrayEquals(
         "Last delete should apply to seq 6",
-        Arrays.copyOfRange(DELETE_FILES, 3, 4),
+        Arrays.copyOfRange(deleteFiles, 3, 4),
         index.forDataFile(6, FILE_A));
     Assert.assertArrayEquals(
         "No deletes should apply to seq 7", new DataFile[0], 
index.forDataFile(7, FILE_A));

Reply via email to