This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cc9a5d90f Core: Optimize DeleteFileIndex (#8157)
9cc9a5d90f is described below

commit 9cc9a5d90fab4707e6bb0b7423c3a730a77c31b0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Jul 26 23:10:03 2023 -0700

    Core: Optimize DeleteFileIndex (#8157)
---
 .../java/org/apache/iceberg/DeleteFileIndex.java   | 435 ++++++++++++++-------
 .../apache/iceberg/spark/PlanningBenchmark.java    | 301 ++++++++++++++
 2 files changed, 595 insertions(+), 141 deletions(-)

diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java 
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index eedde21397..9d54348439 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -66,43 +66,48 @@ import org.apache.iceberg.util.Tasks;
  * file.
  */
 class DeleteFileIndex {
-  private final Map<Integer, PartitionSpec> specsById;
+  private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+
   private final Map<Integer, Types.StructType> partitionTypeById;
   private final Map<Integer, ThreadLocal<StructLikeWrapper>> wrapperById;
-  private final long[] globalSeqs;
-  private final DeleteFile[] globalDeletes;
-  private final Map<Pair<Integer, StructLikeWrapper>, Pair<long[], 
DeleteFile[]>>
-      sortedDeletesByPartition;
+  private final DeleteFileGroup globalDeletes;
+  private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
deletesByPartition;
+  private final boolean isEmpty;
 
   DeleteFileIndex(
-      Map<Integer, PartitionSpec> specsById,
+      Map<Integer, PartitionSpec> specs,
       long[] globalSeqs,
       DeleteFile[] globalDeletes,
-      Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> 
sortedDeletesByPartition) {
-    this.specsById = specsById;
+      Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> 
deletesByPartition) {
+    this(specs, index(specs, globalSeqs, globalDeletes), index(specs, 
deletesByPartition));
+  }
+
+  private DeleteFileIndex(
+      Map<Integer, PartitionSpec> specs,
+      DeleteFileGroup globalDeletes,
+      Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
deletesByPartition) {
     ImmutableMap.Builder<Integer, Types.StructType> builder = 
ImmutableMap.builder();
-    specsById.forEach((specId, spec) -> builder.put(specId, 
spec.partitionType()));
+    specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
     this.partitionTypeById = builder.build();
     this.wrapperById = Maps.newConcurrentMap();
-    this.globalSeqs = globalSeqs;
     this.globalDeletes = globalDeletes;
-    this.sortedDeletesByPartition = sortedDeletesByPartition;
+    this.deletesByPartition = deletesByPartition;
+    this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty();
   }
 
   public boolean isEmpty() {
-    return (globalDeletes == null || globalDeletes.length == 0)
-        && sortedDeletesByPartition.isEmpty();
+    return isEmpty;
   }
 
   public Iterable<DeleteFile> referencedDeleteFiles() {
     Iterable<DeleteFile> deleteFiles = Collections.emptyList();
 
     if (globalDeletes != null) {
-      deleteFiles = Iterables.concat(deleteFiles, 
Arrays.asList(globalDeletes));
+      deleteFiles = Iterables.concat(deleteFiles, 
globalDeletes.referencedDeleteFiles());
     }
 
-    for (Pair<long[], DeleteFile[]> partitionDeletes : 
sortedDeletesByPartition.values()) {
-      deleteFiles = Iterables.concat(deleteFiles, 
Arrays.asList(partitionDeletes.second()));
+    for (DeleteFileGroup partitionDeletes : deletesByPartition.values()) {
+      deleteFiles = Iterables.concat(deleteFiles, 
partitionDeletes.referencedDeleteFiles());
     }
 
     return deleteFiles;
@@ -123,64 +128,63 @@ class DeleteFileIndex {
   }
 
   DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
+    if (isEmpty) {
+      return NO_DELETES;
+    }
+
     Pair<Integer, StructLikeWrapper> partition = partition(file.specId(), 
file.partition());
-    Pair<long[], DeleteFile[]> partitionDeletes = 
sortedDeletesByPartition.get(partition);
+    DeleteFileGroup partitionDeletes = deletesByPartition.get(partition);
+
+    if (globalDeletes == null && partitionDeletes == null) {
+      return NO_DELETES;
+    }
 
-    Stream<DeleteFile> matchingDeletes;
+    Stream<IndexedDeleteFile> matchingDeletes;
     if (partitionDeletes == null) {
-      matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs, 
globalDeletes);
+      matchingDeletes = globalDeletes.limit(sequenceNumber);
     } else if (globalDeletes == null) {
-      matchingDeletes =
-          limitBySequenceNumber(
-              sequenceNumber, partitionDeletes.first(), 
partitionDeletes.second());
+      matchingDeletes = partitionDeletes.limit(sequenceNumber);
     } else {
-      matchingDeletes =
-          Stream.concat(
-              limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
-              limitBySequenceNumber(
-                  sequenceNumber, partitionDeletes.first(), 
partitionDeletes.second()));
+      Stream<IndexedDeleteFile> matchingGlobalDeletes = 
globalDeletes.limit(sequenceNumber);
+      Stream<IndexedDeleteFile> matchingPartitionDeletes = 
partitionDeletes.limit(sequenceNumber);
+      matchingDeletes = Stream.concat(matchingGlobalDeletes, 
matchingPartitionDeletes);
     }
 
     return matchingDeletes
-        .filter(
-            deleteFile ->
-                canContainDeletesForFile(file, deleteFile, 
specsById.get(file.specId()).schema()))
+        .filter(deleteFile -> canContainDeletesForFile(file, deleteFile))
+        .map(IndexedDeleteFile::wrapped)
         .toArray(DeleteFile[]::new);
   }
 
-  private static boolean canContainDeletesForFile(
-      DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+  private static boolean canContainDeletesForFile(DataFile dataFile, 
IndexedDeleteFile deleteFile) {
     switch (deleteFile.content()) {
       case POSITION_DELETES:
         return canContainPosDeletesForFile(dataFile, deleteFile);
 
       case EQUALITY_DELETES:
-        return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+        return canContainEqDeletesForFile(dataFile, deleteFile, 
deleteFile.spec().schema());
     }
 
     return true;
   }
 
-  private static boolean canContainPosDeletesForFile(DataFile dataFile, 
DeleteFile deleteFile) {
+  private static boolean canContainPosDeletesForFile(
+      DataFile dataFile, IndexedDeleteFile deleteFile) {
     // check that the delete file can contain the data file's file_path
-    Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
-    Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
-    if (lowers == null || uppers == null) {
+    if (deleteFile.hasNoLowerOrUpperBounds()) {
       return true;
     }
 
-    Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
     int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
     Comparator<CharSequence> comparator = Comparators.charSequences();
-    ByteBuffer lower = lowers.get(pathId);
-    if (lower != null
-        && comparator.compare(dataFile.path(), 
Conversions.fromByteBuffer(pathType, lower)) < 0) {
+
+    CharSequence lower = deleteFile.lowerBound(pathId);
+    if (lower != null && comparator.compare(dataFile.path(), lower) < 0) {
       return false;
     }
 
-    ByteBuffer upper = uppers.get(pathId);
-    if (upper != null
-        && comparator.compare(dataFile.path(), 
Conversions.fromByteBuffer(pathType, upper)) > 0) {
+    CharSequence upper = deleteFile.upperBound(pathId);
+    if (upper != null && comparator.compare(dataFile.path(), upper) > 0) {
       return false;
     }
 
@@ -189,20 +193,15 @@ class DeleteFileIndex {
 
   @SuppressWarnings("checkstyle:CyclomaticComplexity")
   private static boolean canContainEqDeletesForFile(
-      DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+      DataFile dataFile, IndexedDeleteFile deleteFile, Schema schema) {
+    Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+    Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+
     // whether to check data ranges or to assume that the ranges match
     // if upper/lower bounds are missing, null counts may still be used to 
determine delete files
     // can be skipped
     boolean checkRanges =
-        dataFile.lowerBounds() != null
-            && dataFile.upperBounds() != null
-            && deleteFile.lowerBounds() != null
-            && deleteFile.upperBounds() != null;
-
-    Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
-    Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
-    Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
-    Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();
+        dataLowers != null && dataUppers != null && 
deleteFile.hasLowerAndUpperBounds();
 
     Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
     Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
@@ -241,15 +240,14 @@ class DeleteFileIndex {
 
       ByteBuffer dataLower = dataLowers.get(id);
       ByteBuffer dataUpper = dataUppers.get(id);
-      ByteBuffer deleteLower = deleteLowers.get(id);
-      ByteBuffer deleteUpper = deleteUppers.get(id);
+      Object deleteLower = deleteFile.lowerBound(id);
+      Object deleteUpper = deleteFile.upperBound(id);
       if (dataLower == null || dataUpper == null || deleteLower == null || 
deleteUpper == null) {
         // at least one bound is not known, assume the delete file may match
         continue;
       }
 
-      if (!rangesOverlap(
-          field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower, 
deleteUpper)) {
+      if (!rangesOverlap(field, dataLower, dataUpper, deleteLower, 
deleteUpper)) {
         // no values overlap between the data file and the deletes
         return false;
       }
@@ -259,19 +257,25 @@ class DeleteFileIndex {
   }
 
   private static <T> boolean rangesOverlap(
-      Type.PrimitiveType type,
+      Types.NestedField field,
       ByteBuffer dataLowerBuf,
       ByteBuffer dataUpperBuf,
-      ByteBuffer deleteLowerBuf,
-      ByteBuffer deleteUpperBuf) {
+      T deleteLower,
+      T deleteUpper) {
+    Type.PrimitiveType type = field.type().asPrimitiveType();
     Comparator<T> comparator = Comparators.forType(type);
+
     T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+    if (comparator.compare(dataLower, deleteUpper) > 0) {
+      return false;
+    }
+
     T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
-    T deleteLower = Conversions.fromByteBuffer(type, deleteLowerBuf);
-    T deleteUpper = Conversions.fromByteBuffer(type, deleteUpperBuf);
+    if (comparator.compare(deleteLower, dataUpper) > 0) {
+      return false;
+    }
 
-    return comparator.compare(deleteLower, dataUpper) <= 0
-        && comparator.compare(dataLower, deleteUpper) <= 0;
+    return true;
   }
 
   private static boolean allNonNull(Map<Integer, Long> nullValueCounts, 
Types.NestedField field) {
@@ -327,27 +331,35 @@ class DeleteFileIndex {
     return nullValueCount > 0;
   }
 
-  private static Stream<DeleteFile> limitBySequenceNumber(
-      long sequenceNumber, long[] seqs, DeleteFile[] files) {
-    if (files == null) {
-      return Stream.empty();
+  private static DeleteFileGroup index(
+      Map<Integer, PartitionSpec> specs, Pair<long[], DeleteFile[]> pairs) {
+    return index(specs, pairs.first(), pairs.second());
+  }
+
+  private static DeleteFileGroup index(
+      Map<Integer, PartitionSpec> specs, long[] seqs, DeleteFile[] files) {
+    if (files == null || files.length == 0) {
+      return null;
     }
 
-    int pos = Arrays.binarySearch(seqs, sequenceNumber);
-    int start;
-    if (pos < 0) {
-      // the sequence number was not found, where it would be inserted is 
-(pos + 1)
-      start = -(pos + 1);
-    } else {
-      // the sequence number was found, but may not be the first
-      // find the first delete file with the given sequence number by 
decrementing the position
-      start = pos;
-      while (start > 0 && seqs[start - 1] >= sequenceNumber) {
-        start -= 1;
-      }
+    IndexedDeleteFile[] indexedGlobalDeleteFiles = new 
IndexedDeleteFile[files.length];
+
+    for (int pos = 0; pos < files.length; pos++) {
+      DeleteFile file = files[pos];
+      PartitionSpec spec = specs.get(file.specId());
+      long applySequenceNumber = seqs[pos];
+      indexedGlobalDeleteFiles[pos] = new IndexedDeleteFile(spec, file, 
applySequenceNumber);
     }
 
-    return Arrays.stream(files, start, files.length);
+    return new DeleteFileGroup(seqs, indexedGlobalDeleteFiles);
+  }
+
+  private static Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> index(
+      Map<Integer, PartitionSpec> specs,
+      Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> 
deletesByPartition) {
+    Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> indexed = 
Maps.newHashMap();
+    deletesByPartition.forEach((key, value) -> indexed.put(key, index(specs, 
value)));
+    return indexed;
   }
 
   static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests) 
{
@@ -414,7 +426,7 @@ class DeleteFileIndex {
     DeleteFileIndex build() {
       // read all of the matching delete manifests in parallel and accumulate 
the matching files in
       // a queue
-      Queue<ManifestEntry<DeleteFile>> deleteEntries = new 
ConcurrentLinkedQueue<>();
+      Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
       Tasks.foreach(deleteManifestReaders())
           .stopOnFailure()
           .throwFailureWhenFinished()
@@ -425,7 +437,7 @@ class DeleteFileIndex {
                   for (ManifestEntry<DeleteFile> entry : reader) {
                     if (entry.dataSequenceNumber() > minSequenceNumber) {
                       // copy with stats for better filtering against data 
file stats
-                      deleteEntries.add(entry.copy());
+                      files.add(entry.file().copy());
                     }
                   }
                 } catch (IOException e) {
@@ -435,86 +447,61 @@ class DeleteFileIndex {
 
       // build a map from (specId, partition) to delete file entries
       Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
-      ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>>
-          deleteFilesByPartition =
-              Multimaps.newListMultimap(Maps.newHashMap(), 
Lists::newArrayList);
-      for (ManifestEntry<DeleteFile> entry : deleteEntries) {
-        int specId = entry.file().specId();
+      ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile> 
deleteFilesByPartition =
+          Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+      for (DeleteFile file : files) {
+        int specId = file.specId();
+        PartitionSpec spec = specsById.get(specId);
         StructLikeWrapper wrapper =
             wrappersBySpecId
-                .computeIfAbsent(
-                    specId, id -> 
StructLikeWrapper.forType(specsById.get(id).partitionType()))
-                .copyFor(entry.file().partition());
-        deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
+                .computeIfAbsent(specId, id -> 
StructLikeWrapper.forType(spec.partitionType()))
+                .copyFor(file.partition());
+        deleteFilesByPartition.put(Pair.of(specId, wrapper), new 
IndexedDeleteFile(spec, file));
       }
 
       // sort the entries in each map value by sequence number and split into 
sequence numbers and
       // delete files lists
-      Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> 
sortedDeletesByPartition =
+      Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> 
sortedDeletesByPartition =
           Maps.newHashMap();
       // also, separate out equality deletes in an unpartitioned spec that 
should be applied
       // globally
-      long[] globalApplySeqs = null;
-      DeleteFile[] globalDeletes = null;
+      DeleteFileGroup globalDeletes = null;
       for (Pair<Integer, StructLikeWrapper> partition : 
deleteFilesByPartition.keySet()) {
         if (specsById.get(partition.first()).isUnpartitioned()) {
           Preconditions.checkState(
               globalDeletes == null, "Detected multiple partition specs with 
no partitions");
 
-          List<Pair<Long, DeleteFile>> eqFilesSortedBySeq =
+          IndexedDeleteFile[] eqFilesSortedBySeq =
               deleteFilesByPartition.get(partition).stream()
-                  .filter(entry -> entry.file().content() == 
FileContent.EQUALITY_DELETES)
-                  .map(
-                      entry ->
-                          // a delete file is indexed by the sequence number 
it should be applied to
-                          Pair.of(entry.dataSequenceNumber() - 1, 
entry.file()))
-                  .sorted(Comparator.comparingLong(Pair::first))
-                  .collect(Collectors.toList());
-
-          globalApplySeqs = 
eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
-          globalDeletes = 
eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
-
-          List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
+                  .filter(file -> file.content() == 
FileContent.EQUALITY_DELETES)
+                  
.sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber))
+                  .toArray(IndexedDeleteFile[]::new);
+          if (eqFilesSortedBySeq.length > 0) {
+            globalDeletes = new DeleteFileGroup(eqFilesSortedBySeq);
+          }
+
+          IndexedDeleteFile[] posFilesSortedBySeq =
               deleteFilesByPartition.get(partition).stream()
-                  .filter(entry -> entry.file().content() == 
FileContent.POSITION_DELETES)
-                  .map(entry -> Pair.of(entry.dataSequenceNumber(), 
entry.file()))
-                  .sorted(Comparator.comparingLong(Pair::first))
-                  .collect(Collectors.toList());
-
-          long[] seqs = 
posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
-          DeleteFile[] files =
-              
posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
-
-          sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
+                  .filter(file -> file.content() == 
FileContent.POSITION_DELETES)
+                  
.sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber))
+                  .toArray(IndexedDeleteFile[]::new);
+          sortedDeletesByPartition.put(partition, new 
DeleteFileGroup(posFilesSortedBySeq));
 
         } else {
-          List<Pair<Long, DeleteFile>> filesSortedBySeq =
+          IndexedDeleteFile[] filesSortedBySeq =
               deleteFilesByPartition.get(partition).stream()
-                  .map(
-                      entry -> {
-                        // a delete file is indexed by the sequence number it 
should be applied to
-                        long applySeq =
-                            entry.dataSequenceNumber()
-                                - (entry.file().content() == 
FileContent.EQUALITY_DELETES ? 1 : 0);
-                        return Pair.of(applySeq, entry.file());
-                      })
-                  .sorted(Comparator.comparingLong(Pair::first))
-                  .collect(Collectors.toList());
-
-          long[] seqs = 
filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
-          DeleteFile[] files =
-              
filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
-
-          sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
+                  
.sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber))
+                  .toArray(IndexedDeleteFile[]::new);
+          sortedDeletesByPartition.put(partition, new 
DeleteFileGroup(filesSortedBySeq));
         }
       }
 
-      scanMetrics.indexedDeleteFiles().increment(deleteEntries.size());
+      scanMetrics.indexedDeleteFiles().increment(files.size());
       deleteFilesByPartition
           .values()
           .forEach(
-              entry -> {
-                FileContent content = entry.file().content();
+              file -> {
+                FileContent content = file.content();
                 if (content == FileContent.EQUALITY_DELETES) {
                   scanMetrics.equalityDeleteFiles().increment();
                 } else if (content == FileContent.POSITION_DELETES) {
@@ -522,8 +509,7 @@ class DeleteFileIndex {
                 }
               });
 
-      return new DeleteFileIndex(
-          specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
+      return new DeleteFileIndex(specsById, globalDeletes, 
sortedDeletesByPartition);
     }
 
     private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> 
deleteManifestReaders() {
@@ -569,4 +555,171 @@ class DeleteFileIndex {
                   .liveEntries());
     }
   }
+
+  // a group of indexed delete files sorted by the sequence number they apply 
to
+  private static class DeleteFileGroup {
+    private final long[] seqs;
+    private final IndexedDeleteFile[] files;
+
+    DeleteFileGroup(IndexedDeleteFile[] files) {
+      this.seqs = 
Arrays.stream(files).mapToLong(IndexedDeleteFile::applySequenceNumber).toArray();
+      this.files = files;
+    }
+
+    DeleteFileGroup(long[] seqs, IndexedDeleteFile[] files) {
+      this.seqs = seqs;
+      this.files = files;
+    }
+
+    public Stream<IndexedDeleteFile> limit(long seq) {
+      int pos = Arrays.binarySearch(seqs, seq);
+      int start;
+      if (pos < 0) {
+        // the sequence number was not found, where it would be inserted is 
-(pos + 1)
+        start = -(pos + 1);
+      } else {
+        // the sequence number was found, but may not be the first
+        // find the first delete file with the given sequence number by 
decrementing the position
+        start = pos;
+        while (start > 0 && seqs[start - 1] >= seq) {
+          start -= 1;
+        }
+      }
+
+      return Arrays.stream(files, start, files.length);
+    }
+
+    public Iterable<DeleteFile> referencedDeleteFiles() {
+      return 
Arrays.stream(files).map(IndexedDeleteFile::wrapped).collect(Collectors.toList());
+    }
+  }
+
+  // a delete file wrapper that caches the converted boundaries for faster 
boundary checks
+  // this class is not meant to be exposed beyond the delete file index
+  private static class IndexedDeleteFile {
+    private final PartitionSpec spec;
+    private final DeleteFile wrapped;
+    private final long applySequenceNumber;
+    private volatile Map<Integer, Object> convertedLowerBounds = null;
+    private volatile Map<Integer, Object> convertedUpperBounds = null;
+
+    IndexedDeleteFile(PartitionSpec spec, DeleteFile file, long 
applySequenceNumber) {
+      this.spec = spec;
+      this.wrapped = file;
+      this.applySequenceNumber = applySequenceNumber;
+    }
+
+    IndexedDeleteFile(PartitionSpec spec, DeleteFile file) {
+      this.spec = spec;
+      this.wrapped = file;
+
+      if (file.content() == FileContent.EQUALITY_DELETES) {
+        this.applySequenceNumber = file.dataSequenceNumber() - 1;
+      } else {
+        this.applySequenceNumber = file.dataSequenceNumber();
+      }
+    }
+
+    public PartitionSpec spec() {
+      return spec;
+    }
+
+    public DeleteFile wrapped() {
+      return wrapped;
+    }
+
+    public long applySequenceNumber() {
+      return applySequenceNumber;
+    }
+
+    public FileContent content() {
+      return wrapped.content();
+    }
+
+    public List<Integer> equalityFieldIds() {
+      return wrapped.equalityFieldIds();
+    }
+
+    public Map<Integer, Long> valueCounts() {
+      return wrapped.valueCounts();
+    }
+
+    public Map<Integer, Long> nullValueCounts() {
+      return wrapped.nullValueCounts();
+    }
+
+    public Map<Integer, Long> nanValueCounts() {
+      return wrapped.nanValueCounts();
+    }
+
+    public boolean hasNoLowerOrUpperBounds() {
+      return wrapped.lowerBounds() == null || wrapped.upperBounds() == null;
+    }
+
+    public boolean hasLowerAndUpperBounds() {
+      return wrapped.lowerBounds() != null && wrapped.upperBounds() != null;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T lowerBound(int id) {
+      return (T) lowerBounds().get(id);
+    }
+
+    private Map<Integer, Object> lowerBounds() {
+      if (convertedLowerBounds == null) {
+        synchronized (this) {
+          if (convertedLowerBounds == null) {
+            this.convertedLowerBounds = convertBounds(wrapped.lowerBounds());
+          }
+        }
+      }
+
+      return convertedLowerBounds;
+    }
+
+    @SuppressWarnings("unchecked")
+    public <T> T upperBound(int id) {
+      return (T) upperBounds().get(id);
+    }
+
+    private Map<Integer, Object> upperBounds() {
+      if (convertedUpperBounds == null) {
+        synchronized (this) {
+          if (convertedUpperBounds == null) {
+            this.convertedUpperBounds = convertBounds(wrapped.upperBounds());
+          }
+        }
+      }
+
+      return convertedUpperBounds;
+    }
+
+    private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer> 
bounds) {
+      Map<Integer, Object> converted = Maps.newHashMap();
+
+      if (bounds != null) {
+        if (wrapped.content() == FileContent.POSITION_DELETES) {
+          Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+          int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+          ByteBuffer bound = bounds.get(pathId);
+          if (bound != null) {
+            converted.put(pathId, Conversions.fromByteBuffer(pathType, bound));
+          }
+
+        } else {
+          for (int id : equalityFieldIds()) {
+            Type type = spec.schema().findField(id).type();
+            if (type.isPrimitiveType()) {
+              ByteBuffer bound = bounds.get(id);
+              if (bound != null) {
+                converted.put(id, Conversions.fromByteBuffer(type, bound));
+              }
+            }
+          }
+        }
+      }
+
+      return converted;
+    }
+  }
 }
diff --git 
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
 
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
new file mode 100644
index 0000000000..6e75b6aa00
--- /dev/null
+++ 
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.lit;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that evaluates the job planning performance.
+ *
+ * <p>To run this benchmark for spark-3.4: <code>
+ *   ./gradlew -DsparkVersions=3.4 
:iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ *       -PjmhIncludeRegex=PlanningBenchmark
+ *       -PjmhOutputPath=benchmark/iceberg-planning-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@Timeout(time = 20, timeUnit = TimeUnit.MINUTES)
+@BenchmarkMode(Mode.SingleShotTime)
+public class PlanningBenchmark {
+
+  private static final String TABLE_NAME = "test_table";
+  private static final String PARTITION_COLUMN = "ss_ticket_number";
+  private static final int PARTITION_VALUE = 10;
+  private static final String SORT_KEY_COLUMN = "ss_sold_date_sk";
+  private static final int SORT_KEY_VALUE = 5;
+
+  private static final String SORT_KEY_PREDICATE =
+      String.format("%s = %s", SORT_KEY_COLUMN, SORT_KEY_VALUE);
+  private static final String PARTITION_AND_SORT_KEY_PREDICATE =
+      String.format(
+          "%s = %d AND %s = %d",
+          PARTITION_COLUMN, PARTITION_VALUE, SORT_KEY_COLUMN, SORT_KEY_VALUE);
+
+  private static final int NUM_PARTITIONS = 30;
+  private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
+  private static final int NUM_DELETE_FILES_PER_PARTITION = 50;
+  private static final int NUM_ROWS_PER_DATA_FILE = 500;
+
+  private final Configuration hadoopConf = new Configuration();
+  private SparkSession spark;
+  private Table table;
+
+  @Setup
+  public void setupBenchmark() throws NoSuchTableException, ParseException {
+    setupSpark();
+    initTable();
+    initDataAndDeletes();
+  }
+
+  @TearDown
+  public void tearDownBenchmark() {
+    dropTable();
+    tearDownSpark();
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void localPlanningWithPartitionAndMinMaxFilter(Blackhole blackhole) {
+    InputPartition[] partitions = 
planInputPartitions(PARTITION_AND_SORT_KEY_PREDICATE);
+    blackhole.consume(partitions);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void localPlanningWithMinMaxFilter(Blackhole blackhole) {
+    InputPartition[] partitions = planInputPartitions(SORT_KEY_PREDICATE);
+    blackhole.consume(partitions);
+  }
+
+  @Benchmark
+  @Threads(1)
+  public void localPlanningWithoutFilter(Blackhole blackhole) {
+    InputPartition[] partitions = planInputPartitions("true");
+    blackhole.consume(partitions);
+  }
+
+  private void setupSpark() {
+    this.spark =
+        SparkSession.builder()
+            .config("spark.ui.enabled", false)
+            .config("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer")
+            .config("spark.sql.extensions", 
IcebergSparkSessionExtensions.class.getName())
+            .config("spark.sql.catalog.spark_catalog", 
SparkSessionCatalog.class.getName())
+            .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+            .config("spark.sql.catalog.spark_catalog.warehouse", 
newWarehouseDir())
+            .master("local[*]")
+            .getOrCreate();
+  }
+
+  private void tearDownSpark() {
+    spark.stop();
+  }
+
+  private void initTable() throws NoSuchTableException, ParseException {
+    sql(
+        "CREATE TABLE %s ( "
+            + " `ss_sold_date_sk` INT, "
+            + " `ss_sold_time_sk` INT, "
+            + " `ss_item_sk` INT, "
+            + " `ss_customer_sk` STRING, "
+            + " `ss_cdemo_sk` STRING, "
+            + " `ss_hdemo_sk` STRING, "
+            + " `ss_addr_sk` STRING, "
+            + " `ss_store_sk` STRING, "
+            + " `ss_promo_sk` STRING, "
+            + " `ss_ticket_number` INT, "
+            + " `ss_quantity` STRING, "
+            + " `ss_wholesale_cost` STRING, "
+            + " `ss_list_price` STRING, "
+            + " `ss_sales_price` STRING, "
+            + " `ss_ext_discount_amt` STRING, "
+            + " `ss_ext_sales_price` STRING, "
+            + " `ss_ext_wholesale_cost` STRING, "
+            + " `ss_ext_list_price` STRING, "
+            + " `ss_ext_tax` STRING, "
+            + " `ss_coupon_amt` STRING, "
+            + " `ss_net_paid` STRING, "
+            + " `ss_net_paid_inc_tax` STRING, "
+            + " `ss_net_profit` STRING "
+            + ")"
+            + "USING iceberg "
+            + "PARTITIONED BY (%s) "
+            + "TBLPROPERTIES ("
+            + " '%s' '%s',"
+            + " '%s' '%d')",
+        TABLE_NAME,
+        PARTITION_COLUMN,
+        TableProperties.DELETE_MODE,
+        RowLevelOperationMode.MERGE_ON_READ.modeName(),
+        TableProperties.FORMAT_VERSION,
+        2);
+
+    this.table = Spark3Util.loadIcebergTable(spark, TABLE_NAME);
+  }
+
+  private void dropTable() {
+    sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME);
+  }
+
+  private DataFile loadAddedDataFile() {
+    table.refresh();
+
+    Iterable<DataFile> dataFiles = 
table.currentSnapshot().addedDataFiles(table.io());
+    return Iterables.getOnlyElement(dataFiles);
+  }
+
+  private DeleteFile loadAddedDeleteFile() {
+    table.refresh();
+
+    Iterable<DeleteFile> deleteFiles = 
table.currentSnapshot().addedDeleteFiles(table.io());
+    return Iterables.getOnlyElement(deleteFiles);
+  }
+
+  private void initDataAndDeletes() throws NoSuchTableException {
+    for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS; 
partitionOrdinal++) {
+      Dataset<Row> inputDF =
+          randomDataDF(table.schema(), NUM_ROWS_PER_DATA_FILE)
+              .drop(PARTITION_COLUMN)
+              .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
+      appendAsFile(inputDF);
+
+      DataFile dataFile = loadAddedDataFile();
+
+      sql(
+          "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
+          TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);
+
+      DeleteFile deleteFile = loadAddedDeleteFile();
+
+      AppendFiles append = table.newFastAppend();
+
+      for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION; 
fileOrdinal++) {
+        DataFile replicaDataFile =
+            DataFiles.builder(table.spec())
+                .copy(dataFile)
+                .withPath("replica-" + fileOrdinal + "-" + dataFile.path())
+                .build();
+        append.appendFile(replicaDataFile);
+      }
+
+      append.commit();
+
+      RowDelta rowDelta = table.newRowDelta();
+
+      for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION; 
fileOrdinal++) {
+        DeleteFile replicaDeleteFile =
+            FileMetadata.deleteFileBuilder(table.spec())
+                .copy(deleteFile)
+                .withPath("replica-" + fileOrdinal + "-" + deleteFile.path())
+                .build();
+        rowDelta.addDeletes(replicaDeleteFile);
+      }
+
+      rowDelta.commit();
+
+      Dataset<Row> sortedInputDF =
+          randomDataDF(table.schema(), NUM_ROWS_PER_DATA_FILE)
+              .drop(SORT_KEY_COLUMN)
+              .withColumn(SORT_KEY_COLUMN, lit(SORT_KEY_VALUE))
+              .drop(PARTITION_COLUMN)
+              .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
+      appendAsFile(sortedInputDF);
+    }
+  }
+
+  private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
+    df.coalesce(1).writeTo(TABLE_NAME).append();
+  }
+
+  private String newWarehouseDir() {
+    return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID();
+  }
+
+  private Dataset<Row> randomDataDF(Schema schema, int numRows) {
+    Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
+    JavaSparkContext context = 
JavaSparkContext.fromSparkContext(spark.sparkContext());
+    JavaRDD<InternalRow> rowRDD = 
context.parallelize(Lists.newArrayList(rows));
+    StructType rowSparkType = SparkSchemaUtil.convert(schema);
+    return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType, 
false);
+  }
+
+  private InputPartition[] planInputPartitions(String predicate) {
+    DataSourceV2ScanRelation relation =
+        (DataSourceV2ScanRelation)
+            spark
+                .sql(String.format("SELECT * FROM %s WHERE %s", TABLE_NAME, 
predicate))
+                .queryExecution()
+                .optimizedPlan()
+                .collectLeaves()
+                .head();
+    return relation.scan().toBatch().planInputPartitions();
+  }
+
+  @FormatMethod
+  private void sql(@FormatString String query, Object... args) {
+    spark.sql(String.format(query, args));
+  }
+}


Reply via email to