This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 9cc9a5d90f Core: Optimize DeleteFileIndex (#8157)
9cc9a5d90f is described below
commit 9cc9a5d90fab4707e6bb0b7423c3a730a77c31b0
Author: Anton Okolnychyi <[email protected]>
AuthorDate: Wed Jul 26 23:10:03 2023 -0700
Core: Optimize DeleteFileIndex (#8157)
---
.../java/org/apache/iceberg/DeleteFileIndex.java | 435 ++++++++++++++-------
.../apache/iceberg/spark/PlanningBenchmark.java | 301 ++++++++++++++
2 files changed, 595 insertions(+), 141 deletions(-)
diff --git a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
index eedde21397..9d54348439 100644
--- a/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
+++ b/core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
@@ -66,43 +66,48 @@ import org.apache.iceberg.util.Tasks;
* file.
*/
class DeleteFileIndex {
- private final Map<Integer, PartitionSpec> specsById;
+ private static final DeleteFile[] NO_DELETES = new DeleteFile[0];
+
private final Map<Integer, Types.StructType> partitionTypeById;
private final Map<Integer, ThreadLocal<StructLikeWrapper>> wrapperById;
- private final long[] globalSeqs;
- private final DeleteFile[] globalDeletes;
- private final Map<Pair<Integer, StructLikeWrapper>, Pair<long[],
DeleteFile[]>>
- sortedDeletesByPartition;
+ private final DeleteFileGroup globalDeletes;
+ private final Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
deletesByPartition;
+ private final boolean isEmpty;
DeleteFileIndex(
- Map<Integer, PartitionSpec> specsById,
+ Map<Integer, PartitionSpec> specs,
long[] globalSeqs,
DeleteFile[] globalDeletes,
- Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>>
sortedDeletesByPartition) {
- this.specsById = specsById;
+ Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>>
deletesByPartition) {
+ this(specs, index(specs, globalSeqs, globalDeletes), index(specs,
deletesByPartition));
+ }
+
+ private DeleteFileIndex(
+ Map<Integer, PartitionSpec> specs,
+ DeleteFileGroup globalDeletes,
+ Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
deletesByPartition) {
ImmutableMap.Builder<Integer, Types.StructType> builder =
ImmutableMap.builder();
- specsById.forEach((specId, spec) -> builder.put(specId,
spec.partitionType()));
+ specs.forEach((specId, spec) -> builder.put(specId, spec.partitionType()));
this.partitionTypeById = builder.build();
this.wrapperById = Maps.newConcurrentMap();
- this.globalSeqs = globalSeqs;
this.globalDeletes = globalDeletes;
- this.sortedDeletesByPartition = sortedDeletesByPartition;
+ this.deletesByPartition = deletesByPartition;
+ this.isEmpty = globalDeletes == null && deletesByPartition.isEmpty();
}
public boolean isEmpty() {
- return (globalDeletes == null || globalDeletes.length == 0)
- && sortedDeletesByPartition.isEmpty();
+ return isEmpty;
}
public Iterable<DeleteFile> referencedDeleteFiles() {
Iterable<DeleteFile> deleteFiles = Collections.emptyList();
if (globalDeletes != null) {
- deleteFiles = Iterables.concat(deleteFiles,
Arrays.asList(globalDeletes));
+ deleteFiles = Iterables.concat(deleteFiles,
globalDeletes.referencedDeleteFiles());
}
- for (Pair<long[], DeleteFile[]> partitionDeletes :
sortedDeletesByPartition.values()) {
- deleteFiles = Iterables.concat(deleteFiles,
Arrays.asList(partitionDeletes.second()));
+ for (DeleteFileGroup partitionDeletes : deletesByPartition.values()) {
+ deleteFiles = Iterables.concat(deleteFiles,
partitionDeletes.referencedDeleteFiles());
}
return deleteFiles;
@@ -123,64 +128,63 @@ class DeleteFileIndex {
}
DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
+ if (isEmpty) {
+ return NO_DELETES;
+ }
+
Pair<Integer, StructLikeWrapper> partition = partition(file.specId(),
file.partition());
- Pair<long[], DeleteFile[]> partitionDeletes =
sortedDeletesByPartition.get(partition);
+ DeleteFileGroup partitionDeletes = deletesByPartition.get(partition);
+
+ if (globalDeletes == null && partitionDeletes == null) {
+ return NO_DELETES;
+ }
- Stream<DeleteFile> matchingDeletes;
+ Stream<IndexedDeleteFile> matchingDeletes;
if (partitionDeletes == null) {
- matchingDeletes = limitBySequenceNumber(sequenceNumber, globalSeqs,
globalDeletes);
+ matchingDeletes = globalDeletes.limit(sequenceNumber);
} else if (globalDeletes == null) {
- matchingDeletes =
- limitBySequenceNumber(
- sequenceNumber, partitionDeletes.first(),
partitionDeletes.second());
+ matchingDeletes = partitionDeletes.limit(sequenceNumber);
} else {
- matchingDeletes =
- Stream.concat(
- limitBySequenceNumber(sequenceNumber, globalSeqs, globalDeletes),
- limitBySequenceNumber(
- sequenceNumber, partitionDeletes.first(),
partitionDeletes.second()));
+ Stream<IndexedDeleteFile> matchingGlobalDeletes =
globalDeletes.limit(sequenceNumber);
+ Stream<IndexedDeleteFile> matchingPartitionDeletes =
partitionDeletes.limit(sequenceNumber);
+ matchingDeletes = Stream.concat(matchingGlobalDeletes,
matchingPartitionDeletes);
}
return matchingDeletes
- .filter(
- deleteFile ->
- canContainDeletesForFile(file, deleteFile,
specsById.get(file.specId()).schema()))
+ .filter(deleteFile -> canContainDeletesForFile(file, deleteFile))
+ .map(IndexedDeleteFile::wrapped)
.toArray(DeleteFile[]::new);
}
- private static boolean canContainDeletesForFile(
- DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+ private static boolean canContainDeletesForFile(DataFile dataFile,
IndexedDeleteFile deleteFile) {
switch (deleteFile.content()) {
case POSITION_DELETES:
return canContainPosDeletesForFile(dataFile, deleteFile);
case EQUALITY_DELETES:
- return canContainEqDeletesForFile(dataFile, deleteFile, schema);
+ return canContainEqDeletesForFile(dataFile, deleteFile,
deleteFile.spec().schema());
}
return true;
}
- private static boolean canContainPosDeletesForFile(DataFile dataFile,
DeleteFile deleteFile) {
+ private static boolean canContainPosDeletesForFile(
+ DataFile dataFile, IndexedDeleteFile deleteFile) {
// check that the delete file can contain the data file's file_path
- Map<Integer, ByteBuffer> lowers = deleteFile.lowerBounds();
- Map<Integer, ByteBuffer> uppers = deleteFile.upperBounds();
- if (lowers == null || uppers == null) {
+ if (deleteFile.hasNoLowerOrUpperBounds()) {
return true;
}
- Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
Comparator<CharSequence> comparator = Comparators.charSequences();
- ByteBuffer lower = lowers.get(pathId);
- if (lower != null
- && comparator.compare(dataFile.path(),
Conversions.fromByteBuffer(pathType, lower)) < 0) {
+
+ CharSequence lower = deleteFile.lowerBound(pathId);
+ if (lower != null && comparator.compare(dataFile.path(), lower) < 0) {
return false;
}
- ByteBuffer upper = uppers.get(pathId);
- if (upper != null
- && comparator.compare(dataFile.path(),
Conversions.fromByteBuffer(pathType, upper)) > 0) {
+ CharSequence upper = deleteFile.upperBound(pathId);
+ if (upper != null && comparator.compare(dataFile.path(), upper) > 0) {
return false;
}
@@ -189,20 +193,15 @@ class DeleteFileIndex {
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private static boolean canContainEqDeletesForFile(
- DataFile dataFile, DeleteFile deleteFile, Schema schema) {
+ DataFile dataFile, IndexedDeleteFile deleteFile, Schema schema) {
+ Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
+ Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
+
// whether to check data ranges or to assume that the ranges match
// if upper/lower bounds are missing, null counts may still be used to
determine delete files
// can be skipped
boolean checkRanges =
- dataFile.lowerBounds() != null
- && dataFile.upperBounds() != null
- && deleteFile.lowerBounds() != null
- && deleteFile.upperBounds() != null;
-
- Map<Integer, ByteBuffer> dataLowers = dataFile.lowerBounds();
- Map<Integer, ByteBuffer> dataUppers = dataFile.upperBounds();
- Map<Integer, ByteBuffer> deleteLowers = deleteFile.lowerBounds();
- Map<Integer, ByteBuffer> deleteUppers = deleteFile.upperBounds();
+ dataLowers != null && dataUppers != null &&
deleteFile.hasLowerAndUpperBounds();
Map<Integer, Long> dataNullCounts = dataFile.nullValueCounts();
Map<Integer, Long> dataValueCounts = dataFile.valueCounts();
@@ -241,15 +240,14 @@ class DeleteFileIndex {
ByteBuffer dataLower = dataLowers.get(id);
ByteBuffer dataUpper = dataUppers.get(id);
- ByteBuffer deleteLower = deleteLowers.get(id);
- ByteBuffer deleteUpper = deleteUppers.get(id);
+ Object deleteLower = deleteFile.lowerBound(id);
+ Object deleteUpper = deleteFile.upperBound(id);
if (dataLower == null || dataUpper == null || deleteLower == null ||
deleteUpper == null) {
// at least one bound is not known, assume the delete file may match
continue;
}
- if (!rangesOverlap(
- field.type().asPrimitiveType(), dataLower, dataUpper, deleteLower,
deleteUpper)) {
+ if (!rangesOverlap(field, dataLower, dataUpper, deleteLower,
deleteUpper)) {
// no values overlap between the data file and the deletes
return false;
}
@@ -259,19 +257,25 @@ class DeleteFileIndex {
}
private static <T> boolean rangesOverlap(
- Type.PrimitiveType type,
+ Types.NestedField field,
ByteBuffer dataLowerBuf,
ByteBuffer dataUpperBuf,
- ByteBuffer deleteLowerBuf,
- ByteBuffer deleteUpperBuf) {
+ T deleteLower,
+ T deleteUpper) {
+ Type.PrimitiveType type = field.type().asPrimitiveType();
Comparator<T> comparator = Comparators.forType(type);
+
T dataLower = Conversions.fromByteBuffer(type, dataLowerBuf);
+ if (comparator.compare(dataLower, deleteUpper) > 0) {
+ return false;
+ }
+
T dataUpper = Conversions.fromByteBuffer(type, dataUpperBuf);
- T deleteLower = Conversions.fromByteBuffer(type, deleteLowerBuf);
- T deleteUpper = Conversions.fromByteBuffer(type, deleteUpperBuf);
+ if (comparator.compare(deleteLower, dataUpper) > 0) {
+ return false;
+ }
- return comparator.compare(deleteLower, dataUpper) <= 0
- && comparator.compare(dataLower, deleteUpper) <= 0;
+ return true;
}
private static boolean allNonNull(Map<Integer, Long> nullValueCounts,
Types.NestedField field) {
@@ -327,27 +331,35 @@ class DeleteFileIndex {
return nullValueCount > 0;
}
- private static Stream<DeleteFile> limitBySequenceNumber(
- long sequenceNumber, long[] seqs, DeleteFile[] files) {
- if (files == null) {
- return Stream.empty();
+ private static DeleteFileGroup index(
+ Map<Integer, PartitionSpec> specs, Pair<long[], DeleteFile[]> pairs) {
+ return index(specs, pairs.first(), pairs.second());
+ }
+
+ private static DeleteFileGroup index(
+ Map<Integer, PartitionSpec> specs, long[] seqs, DeleteFile[] files) {
+ if (files == null || files.length == 0) {
+ return null;
}
- int pos = Arrays.binarySearch(seqs, sequenceNumber);
- int start;
- if (pos < 0) {
- // the sequence number was not found, where it would be inserted is
-(pos + 1)
- start = -(pos + 1);
- } else {
- // the sequence number was found, but may not be the first
- // find the first delete file with the given sequence number by
decrementing the position
- start = pos;
- while (start > 0 && seqs[start - 1] >= sequenceNumber) {
- start -= 1;
- }
+ IndexedDeleteFile[] indexedGlobalDeleteFiles = new
IndexedDeleteFile[files.length];
+
+ for (int pos = 0; pos < files.length; pos++) {
+ DeleteFile file = files[pos];
+ PartitionSpec spec = specs.get(file.specId());
+ long applySequenceNumber = seqs[pos];
+ indexedGlobalDeleteFiles[pos] = new IndexedDeleteFile(spec, file,
applySequenceNumber);
}
- return Arrays.stream(files, start, files.length);
+ return new DeleteFileGroup(seqs, indexedGlobalDeleteFiles);
+ }
+
+ private static Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> index(
+ Map<Integer, PartitionSpec> specs,
+ Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>>
deletesByPartition) {
+ Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup> indexed =
Maps.newHashMap();
+ deletesByPartition.forEach((key, value) -> indexed.put(key, index(specs,
value)));
+ return indexed;
}
static Builder builderFor(FileIO io, Iterable<ManifestFile> deleteManifests)
{
@@ -414,7 +426,7 @@ class DeleteFileIndex {
DeleteFileIndex build() {
// read all of the matching delete manifests in parallel and accumulate
the matching files in
// a queue
- Queue<ManifestEntry<DeleteFile>> deleteEntries = new
ConcurrentLinkedQueue<>();
+ Queue<DeleteFile> files = new ConcurrentLinkedQueue<>();
Tasks.foreach(deleteManifestReaders())
.stopOnFailure()
.throwFailureWhenFinished()
@@ -425,7 +437,7 @@ class DeleteFileIndex {
for (ManifestEntry<DeleteFile> entry : reader) {
if (entry.dataSequenceNumber() > minSequenceNumber) {
// copy with stats for better filtering against data
file stats
- deleteEntries.add(entry.copy());
+ files.add(entry.file().copy());
}
}
} catch (IOException e) {
@@ -435,86 +447,61 @@ class DeleteFileIndex {
// build a map from (specId, partition) to delete file entries
Map<Integer, StructLikeWrapper> wrappersBySpecId = Maps.newHashMap();
- ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>>
- deleteFilesByPartition =
- Multimaps.newListMultimap(Maps.newHashMap(),
Lists::newArrayList);
- for (ManifestEntry<DeleteFile> entry : deleteEntries) {
- int specId = entry.file().specId();
+ ListMultimap<Pair<Integer, StructLikeWrapper>, IndexedDeleteFile>
deleteFilesByPartition =
+ Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
+ for (DeleteFile file : files) {
+ int specId = file.specId();
+ PartitionSpec spec = specsById.get(specId);
StructLikeWrapper wrapper =
wrappersBySpecId
- .computeIfAbsent(
- specId, id ->
StructLikeWrapper.forType(specsById.get(id).partitionType()))
- .copyFor(entry.file().partition());
- deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
+ .computeIfAbsent(specId, id ->
StructLikeWrapper.forType(spec.partitionType()))
+ .copyFor(file.partition());
+ deleteFilesByPartition.put(Pair.of(specId, wrapper), new
IndexedDeleteFile(spec, file));
}
// sort the entries in each map value by sequence number and split into
sequence numbers and
// delete files lists
- Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>>
sortedDeletesByPartition =
+ Map<Pair<Integer, StructLikeWrapper>, DeleteFileGroup>
sortedDeletesByPartition =
Maps.newHashMap();
// also, separate out equality deletes in an unpartitioned spec that
should be applied
// globally
- long[] globalApplySeqs = null;
- DeleteFile[] globalDeletes = null;
+ DeleteFileGroup globalDeletes = null;
for (Pair<Integer, StructLikeWrapper> partition :
deleteFilesByPartition.keySet()) {
if (specsById.get(partition.first()).isUnpartitioned()) {
Preconditions.checkState(
globalDeletes == null, "Detected multiple partition specs with
no partitions");
- List<Pair<Long, DeleteFile>> eqFilesSortedBySeq =
+ IndexedDeleteFile[] eqFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
- .filter(entry -> entry.file().content() ==
FileContent.EQUALITY_DELETES)
- .map(
- entry ->
- // a delete file is indexed by the sequence number
it should be applied to
- Pair.of(entry.dataSequenceNumber() - 1,
entry.file()))
- .sorted(Comparator.comparingLong(Pair::first))
- .collect(Collectors.toList());
-
- globalApplySeqs =
eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
- globalDeletes =
eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
-
- List<Pair<Long, DeleteFile>> posFilesSortedBySeq =
+ .filter(file -> file.content() ==
FileContent.EQUALITY_DELETES)
+
.sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber))
+ .toArray(IndexedDeleteFile[]::new);
+ if (eqFilesSortedBySeq.length > 0) {
+ globalDeletes = new DeleteFileGroup(eqFilesSortedBySeq);
+ }
+
+ IndexedDeleteFile[] posFilesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
- .filter(entry -> entry.file().content() ==
FileContent.POSITION_DELETES)
- .map(entry -> Pair.of(entry.dataSequenceNumber(),
entry.file()))
- .sorted(Comparator.comparingLong(Pair::first))
- .collect(Collectors.toList());
-
- long[] seqs =
posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
- DeleteFile[] files =
-
posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
-
- sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
+ .filter(file -> file.content() ==
FileContent.POSITION_DELETES)
+
.sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber))
+ .toArray(IndexedDeleteFile[]::new);
+ sortedDeletesByPartition.put(partition, new
DeleteFileGroup(posFilesSortedBySeq));
} else {
- List<Pair<Long, DeleteFile>> filesSortedBySeq =
+ IndexedDeleteFile[] filesSortedBySeq =
deleteFilesByPartition.get(partition).stream()
- .map(
- entry -> {
- // a delete file is indexed by the sequence number it
should be applied to
- long applySeq =
- entry.dataSequenceNumber()
- - (entry.file().content() ==
FileContent.EQUALITY_DELETES ? 1 : 0);
- return Pair.of(applySeq, entry.file());
- })
- .sorted(Comparator.comparingLong(Pair::first))
- .collect(Collectors.toList());
-
- long[] seqs =
filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
- DeleteFile[] files =
-
filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);
-
- sortedDeletesByPartition.put(partition, Pair.of(seqs, files));
+
.sorted(Comparator.comparingLong(IndexedDeleteFile::applySequenceNumber))
+ .toArray(IndexedDeleteFile[]::new);
+ sortedDeletesByPartition.put(partition, new
DeleteFileGroup(filesSortedBySeq));
}
}
- scanMetrics.indexedDeleteFiles().increment(deleteEntries.size());
+ scanMetrics.indexedDeleteFiles().increment(files.size());
deleteFilesByPartition
.values()
.forEach(
- entry -> {
- FileContent content = entry.file().content();
+ file -> {
+ FileContent content = file.content();
if (content == FileContent.EQUALITY_DELETES) {
scanMetrics.equalityDeleteFiles().increment();
} else if (content == FileContent.POSITION_DELETES) {
@@ -522,8 +509,7 @@ class DeleteFileIndex {
}
});
- return new DeleteFileIndex(
- specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
+ return new DeleteFileIndex(specsById, globalDeletes,
sortedDeletesByPartition);
}
private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>>
deleteManifestReaders() {
@@ -569,4 +555,171 @@ class DeleteFileIndex {
.liveEntries());
}
}
+
+ // a group of indexed delete files sorted by the sequence number they apply
to
+ private static class DeleteFileGroup {
+ private final long[] seqs;
+ private final IndexedDeleteFile[] files;
+
+ DeleteFileGroup(IndexedDeleteFile[] files) {
+ this.seqs =
Arrays.stream(files).mapToLong(IndexedDeleteFile::applySequenceNumber).toArray();
+ this.files = files;
+ }
+
+ DeleteFileGroup(long[] seqs, IndexedDeleteFile[] files) {
+ this.seqs = seqs;
+ this.files = files;
+ }
+
+ public Stream<IndexedDeleteFile> limit(long seq) {
+ int pos = Arrays.binarySearch(seqs, seq);
+ int start;
+ if (pos < 0) {
+ // the sequence number was not found, where it would be inserted is
-(pos + 1)
+ start = -(pos + 1);
+ } else {
+ // the sequence number was found, but may not be the first
+ // find the first delete file with the given sequence number by
decrementing the position
+ start = pos;
+ while (start > 0 && seqs[start - 1] >= seq) {
+ start -= 1;
+ }
+ }
+
+ return Arrays.stream(files, start, files.length);
+ }
+
+ public Iterable<DeleteFile> referencedDeleteFiles() {
+ return
Arrays.stream(files).map(IndexedDeleteFile::wrapped).collect(Collectors.toList());
+ }
+ }
+
+ // a delete file wrapper that caches the converted boundaries for faster
boundary checks
+ // this class is not meant to be exposed beyond the delete file index
+ private static class IndexedDeleteFile {
+ private final PartitionSpec spec;
+ private final DeleteFile wrapped;
+ private final long applySequenceNumber;
+ private volatile Map<Integer, Object> convertedLowerBounds = null;
+ private volatile Map<Integer, Object> convertedUpperBounds = null;
+
+ IndexedDeleteFile(PartitionSpec spec, DeleteFile file, long
applySequenceNumber) {
+ this.spec = spec;
+ this.wrapped = file;
+ this.applySequenceNumber = applySequenceNumber;
+ }
+
+ IndexedDeleteFile(PartitionSpec spec, DeleteFile file) {
+ this.spec = spec;
+ this.wrapped = file;
+
+ if (file.content() == FileContent.EQUALITY_DELETES) {
+ this.applySequenceNumber = file.dataSequenceNumber() - 1;
+ } else {
+ this.applySequenceNumber = file.dataSequenceNumber();
+ }
+ }
+
+ public PartitionSpec spec() {
+ return spec;
+ }
+
+ public DeleteFile wrapped() {
+ return wrapped;
+ }
+
+ public long applySequenceNumber() {
+ return applySequenceNumber;
+ }
+
+ public FileContent content() {
+ return wrapped.content();
+ }
+
+ public List<Integer> equalityFieldIds() {
+ return wrapped.equalityFieldIds();
+ }
+
+ public Map<Integer, Long> valueCounts() {
+ return wrapped.valueCounts();
+ }
+
+ public Map<Integer, Long> nullValueCounts() {
+ return wrapped.nullValueCounts();
+ }
+
+ public Map<Integer, Long> nanValueCounts() {
+ return wrapped.nanValueCounts();
+ }
+
+ public boolean hasNoLowerOrUpperBounds() {
+ return wrapped.lowerBounds() == null || wrapped.upperBounds() == null;
+ }
+
+ public boolean hasLowerAndUpperBounds() {
+ return wrapped.lowerBounds() != null && wrapped.upperBounds() != null;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T lowerBound(int id) {
+ return (T) lowerBounds().get(id);
+ }
+
+ private Map<Integer, Object> lowerBounds() {
+ if (convertedLowerBounds == null) {
+ synchronized (this) {
+ if (convertedLowerBounds == null) {
+ this.convertedLowerBounds = convertBounds(wrapped.lowerBounds());
+ }
+ }
+ }
+
+ return convertedLowerBounds;
+ }
+
+ @SuppressWarnings("unchecked")
+ public <T> T upperBound(int id) {
+ return (T) upperBounds().get(id);
+ }
+
+ private Map<Integer, Object> upperBounds() {
+ if (convertedUpperBounds == null) {
+ synchronized (this) {
+ if (convertedUpperBounds == null) {
+ this.convertedUpperBounds = convertBounds(wrapped.upperBounds());
+ }
+ }
+ }
+
+ return convertedUpperBounds;
+ }
+
+ private Map<Integer, Object> convertBounds(Map<Integer, ByteBuffer>
bounds) {
+ Map<Integer, Object> converted = Maps.newHashMap();
+
+ if (bounds != null) {
+ if (wrapped.content() == FileContent.POSITION_DELETES) {
+ Type pathType = MetadataColumns.DELETE_FILE_PATH.type();
+ int pathId = MetadataColumns.DELETE_FILE_PATH.fieldId();
+ ByteBuffer bound = bounds.get(pathId);
+ if (bound != null) {
+ converted.put(pathId, Conversions.fromByteBuffer(pathType, bound));
+ }
+
+ } else {
+ for (int id : equalityFieldIds()) {
+ Type type = spec.schema().findField(id).type();
+ if (type.isPrimitiveType()) {
+ ByteBuffer bound = bounds.get(id);
+ if (bound != null) {
+ converted.put(id, Conversions.fromByteBuffer(type, bound));
+ }
+ }
+ }
+ }
+ }
+
+ return converted;
+ }
+ }
}
diff --git
a/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
new file mode 100644
index 0000000000..6e75b6aa00
--- /dev/null
+++
b/spark/v3.4/spark-extensions/src/jmh/java/org/apache/iceberg/spark/PlanningBenchmark.java
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark;
+
+import static org.apache.spark.sql.functions.lit;
+
+import com.google.errorprone.annotations.FormatMethod;
+import com.google.errorprone.annotations.FormatString;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.FileMetadata;
+import org.apache.iceberg.RowDelta;
+import org.apache.iceberg.RowLevelOperationMode;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.data.RandomData;
+import org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
+import org.apache.spark.sql.catalyst.parser.ParseException;
+import org.apache.spark.sql.connector.read.InputPartition;
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation;
+import org.apache.spark.sql.types.StructType;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Threads;
+import org.openjdk.jmh.annotations.Timeout;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.infra.Blackhole;
+
+/**
+ * A benchmark that evaluates the job planning performance.
+ *
+ * <p>To run this benchmark for spark-3.4: <code>
+ * ./gradlew -DsparkVersions=3.4
:iceberg-spark:iceberg-spark-extensions-3.4_2.12:jmh
+ * -PjmhIncludeRegex=PlanningBenchmark
+ * -PjmhOutputPath=benchmark/iceberg-planning-benchmark.txt
+ * </code>
+ */
+@Fork(1)
+@State(Scope.Benchmark)
+@Warmup(iterations = 3)
+@Measurement(iterations = 5)
+@Timeout(time = 20, timeUnit = TimeUnit.MINUTES)
+@BenchmarkMode(Mode.SingleShotTime)
+public class PlanningBenchmark {
+
+ private static final String TABLE_NAME = "test_table";
+ private static final String PARTITION_COLUMN = "ss_ticket_number";
+ private static final int PARTITION_VALUE = 10;
+ private static final String SORT_KEY_COLUMN = "ss_sold_date_sk";
+ private static final int SORT_KEY_VALUE = 5;
+
+ private static final String SORT_KEY_PREDICATE =
+ String.format("%s = %s", SORT_KEY_COLUMN, SORT_KEY_VALUE);
+ private static final String PARTITION_AND_SORT_KEY_PREDICATE =
+ String.format(
+ "%s = %d AND %s = %d",
+ PARTITION_COLUMN, PARTITION_VALUE, SORT_KEY_COLUMN, SORT_KEY_VALUE);
+
+ private static final int NUM_PARTITIONS = 30;
+ private static final int NUM_DATA_FILES_PER_PARTITION = 50_000;
+ private static final int NUM_DELETE_FILES_PER_PARTITION = 50;
+ private static final int NUM_ROWS_PER_DATA_FILE = 500;
+
+ private final Configuration hadoopConf = new Configuration();
+ private SparkSession spark;
+ private Table table;
+
+ @Setup
+ public void setupBenchmark() throws NoSuchTableException, ParseException {
+ setupSpark();
+ initTable();
+ initDataAndDeletes();
+ }
+
+ @TearDown
+ public void tearDownBenchmark() {
+ dropTable();
+ tearDownSpark();
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void localPlanningWithPartitionAndMinMaxFilter(Blackhole blackhole) {
+ InputPartition[] partitions =
planInputPartitions(PARTITION_AND_SORT_KEY_PREDICATE);
+ blackhole.consume(partitions);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void localPlanningWithMinMaxFilter(Blackhole blackhole) {
+ InputPartition[] partitions = planInputPartitions(SORT_KEY_PREDICATE);
+ blackhole.consume(partitions);
+ }
+
+ @Benchmark
+ @Threads(1)
+ public void localPlanningWithoutFilter(Blackhole blackhole) {
+ InputPartition[] partitions = planInputPartitions("true");
+ blackhole.consume(partitions);
+ }
+
+ private void setupSpark() {
+ this.spark =
+ SparkSession.builder()
+ .config("spark.ui.enabled", false)
+ .config("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
+ .config("spark.sql.extensions",
IcebergSparkSessionExtensions.class.getName())
+ .config("spark.sql.catalog.spark_catalog",
SparkSessionCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.type", "hadoop")
+ .config("spark.sql.catalog.spark_catalog.warehouse",
newWarehouseDir())
+ .master("local[*]")
+ .getOrCreate();
+ }
+
+ private void tearDownSpark() {
+ spark.stop();
+ }
+
+ private void initTable() throws NoSuchTableException, ParseException {
+ sql(
+ "CREATE TABLE %s ( "
+ + " `ss_sold_date_sk` INT, "
+ + " `ss_sold_time_sk` INT, "
+ + " `ss_item_sk` INT, "
+ + " `ss_customer_sk` STRING, "
+ + " `ss_cdemo_sk` STRING, "
+ + " `ss_hdemo_sk` STRING, "
+ + " `ss_addr_sk` STRING, "
+ + " `ss_store_sk` STRING, "
+ + " `ss_promo_sk` STRING, "
+ + " `ss_ticket_number` INT, "
+ + " `ss_quantity` STRING, "
+ + " `ss_wholesale_cost` STRING, "
+ + " `ss_list_price` STRING, "
+ + " `ss_sales_price` STRING, "
+ + " `ss_ext_discount_amt` STRING, "
+ + " `ss_ext_sales_price` STRING, "
+ + " `ss_ext_wholesale_cost` STRING, "
+ + " `ss_ext_list_price` STRING, "
+ + " `ss_ext_tax` STRING, "
+ + " `ss_coupon_amt` STRING, "
+ + " `ss_net_paid` STRING, "
+ + " `ss_net_paid_inc_tax` STRING, "
+ + " `ss_net_profit` STRING "
+ + ")"
+ + "USING iceberg "
+ + "PARTITIONED BY (%s) "
+ + "TBLPROPERTIES ("
+ + " '%s' '%s',"
+ + " '%s' '%d')",
+ TABLE_NAME,
+ PARTITION_COLUMN,
+ TableProperties.DELETE_MODE,
+ RowLevelOperationMode.MERGE_ON_READ.modeName(),
+ TableProperties.FORMAT_VERSION,
+ 2);
+
+ this.table = Spark3Util.loadIcebergTable(spark, TABLE_NAME);
+ }
+
+ private void dropTable() {
+ sql("DROP TABLE IF EXISTS %s PURGE", TABLE_NAME);
+ }
+
+ private DataFile loadAddedDataFile() {
+ table.refresh();
+
+ Iterable<DataFile> dataFiles =
table.currentSnapshot().addedDataFiles(table.io());
+ return Iterables.getOnlyElement(dataFiles);
+ }
+
+ private DeleteFile loadAddedDeleteFile() {
+ table.refresh();
+
+ Iterable<DeleteFile> deleteFiles =
table.currentSnapshot().addedDeleteFiles(table.io());
+ return Iterables.getOnlyElement(deleteFiles);
+ }
+
+ private void initDataAndDeletes() throws NoSuchTableException {
+ for (int partitionOrdinal = 0; partitionOrdinal < NUM_PARTITIONS;
partitionOrdinal++) {
+ Dataset<Row> inputDF =
+ randomDataDF(table.schema(), NUM_ROWS_PER_DATA_FILE)
+ .drop(PARTITION_COLUMN)
+ .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
+ appendAsFile(inputDF);
+
+ DataFile dataFile = loadAddedDataFile();
+
+ sql(
+ "DELETE FROM %s WHERE ss_item_sk IS NULL AND %s = %d",
+ TABLE_NAME, PARTITION_COLUMN, partitionOrdinal);
+
+ DeleteFile deleteFile = loadAddedDeleteFile();
+
+ AppendFiles append = table.newFastAppend();
+
+ for (int fileOrdinal = 0; fileOrdinal < NUM_DATA_FILES_PER_PARTITION;
fileOrdinal++) {
+ DataFile replicaDataFile =
+ DataFiles.builder(table.spec())
+ .copy(dataFile)
+ .withPath("replica-" + fileOrdinal + "-" + dataFile.path())
+ .build();
+ append.appendFile(replicaDataFile);
+ }
+
+ append.commit();
+
+ RowDelta rowDelta = table.newRowDelta();
+
+ for (int fileOrdinal = 0; fileOrdinal < NUM_DELETE_FILES_PER_PARTITION;
fileOrdinal++) {
+ DeleteFile replicaDeleteFile =
+ FileMetadata.deleteFileBuilder(table.spec())
+ .copy(deleteFile)
+ .withPath("replica-" + fileOrdinal + "-" + deleteFile.path())
+ .build();
+ rowDelta.addDeletes(replicaDeleteFile);
+ }
+
+ rowDelta.commit();
+
+ Dataset<Row> sortedInputDF =
+ randomDataDF(table.schema(), NUM_ROWS_PER_DATA_FILE)
+ .drop(SORT_KEY_COLUMN)
+ .withColumn(SORT_KEY_COLUMN, lit(SORT_KEY_VALUE))
+ .drop(PARTITION_COLUMN)
+ .withColumn(PARTITION_COLUMN, lit(partitionOrdinal));
+ appendAsFile(sortedInputDF);
+ }
+ }
+
+ private void appendAsFile(Dataset<Row> df) throws NoSuchTableException {
+ df.coalesce(1).writeTo(TABLE_NAME).append();
+ }
+
+ private String newWarehouseDir() {
+ return hadoopConf.get("hadoop.tmp.dir") + UUID.randomUUID();
+ }
+
+ private Dataset<Row> randomDataDF(Schema schema, int numRows) {
+ Iterable<InternalRow> rows = RandomData.generateSpark(schema, numRows, 0);
+ JavaSparkContext context =
JavaSparkContext.fromSparkContext(spark.sparkContext());
+ JavaRDD<InternalRow> rowRDD =
context.parallelize(Lists.newArrayList(rows));
+ StructType rowSparkType = SparkSchemaUtil.convert(schema);
+ return spark.internalCreateDataFrame(JavaRDD.toRDD(rowRDD), rowSparkType,
false);
+ }
+
+ private InputPartition[] planInputPartitions(String predicate) {
+ DataSourceV2ScanRelation relation =
+ (DataSourceV2ScanRelation)
+ spark
+ .sql(String.format("SELECT * FROM %s WHERE %s", TABLE_NAME,
predicate))
+ .queryExecution()
+ .optimizedPlan()
+ .collectLeaves()
+ .head();
+ return relation.scan().toBatch().planInputPartitions();
+ }
+
+ @FormatMethod
+ private void sql(@FormatString String query, Object... args) {
+ spark.sql(String.format(query, args));
+ }
+}