This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 89262460 feat: implement delete file index (#435)
89262460 is described below

commit 89262460471b4ef8767d2f1518f37841d7f550a2
Author: Gang Wu <[email protected]>
AuthorDate: Mon Dec 29 13:38:38 2025 +0800

    feat: implement delete file index (#435)
    
    Implemented the DeleteFileIndex and Builder to manage and efficiently
    filter delete files (equality deletes, position deletes, and deletion 
vectors)
    based on sequence numbers and partitions.
---
 src/iceberg/CMakeLists.txt                      |    2 +
 src/iceberg/delete_file_index.cc                |  776 +++++++++++++++
 src/iceberg/delete_file_index.h                 |  398 ++++++++
 src/iceberg/manifest/manifest_reader.cc         |   41 +-
 src/iceberg/manifest/manifest_reader.h          |    9 +
 src/iceberg/manifest/manifest_reader_internal.h |    3 +
 src/iceberg/meson.build                         |    3 +
 src/iceberg/metadata_columns.h                  |   41 +-
 src/iceberg/test/CMakeLists.txt                 |   15 +-
 src/iceberg/test/delete_file_index_test.cc      | 1190 +++++++++++++++++++++++
 src/iceberg/test/manifest_reader_test.cc        |   65 ++
 src/iceberg/util/content_file_util.cc           |  123 +++
 src/iceberg/util/content_file_util.h            |   63 ++
 src/iceberg/util/meson.build                    |    1 +
 14 files changed, 2706 insertions(+), 24 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 519757d2..27892935 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -20,6 +20,7 @@ set(ICEBERG_INCLUDES 
"$<BUILD_INTERFACE:${PROJECT_BINARY_DIR}/src>"
 set(ICEBERG_SOURCES
     arrow_c_data_guard_internal.cc
     catalog/memory/in_memory_catalog.cc
+    delete_file_index.cc
     expression/aggregate.cc
     expression/binder.cc
     expression/evaluator.cc
@@ -80,6 +81,7 @@ set(ICEBERG_SOURCES
     update/update_properties.cc
     update/update_sort_order.cc
     util/bucket_util.cc
+    util/content_file_util.cc
     util/conversions.cc
     util/decimal.cc
     util/gzip_internal.cc
diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc
new file mode 100644
index 00000000..2a96e9d3
--- /dev/null
+++ b/src/iceberg/delete_file_index.cc
@@ -0,0 +1,776 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/delete_file_index.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <iterator>
+#include <ranges>
+#include <vector>
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/expression/projections.h"
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace internal {
+
+Status EqualityDeleteFile::ConvertBoundsIfNeeded() const {
+  if (bounds_converted) {
+    return {};
+  }
+
+  // Convert bounds for equality field IDs only
+  for (int32_t field_id : wrapped.data_file->equality_ids) {
+    ICEBERG_ASSIGN_OR_RAISE(auto field, schema->FindFieldById(field_id));
+    if (!field.has_value()) {
+      continue;
+    }
+
+    const auto& schema_field = field.value().get();
+    if (schema_field.type()->is_nested()) {
+      continue;
+    }
+
+    const auto primitive_type = 
checked_pointer_cast<PrimitiveType>(schema_field.type());
+
+    // Convert lower bound
+    if (auto it = wrapped.data_file->lower_bounds.find(field_id);
+        it != wrapped.data_file->lower_bounds.cend() && !it->second.empty()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto lower,
+                              Literal::Deserialize(it->second, 
primitive_type));
+      lower_bounds.emplace(field_id, std::move(lower));
+    }
+
+    // Convert upper bound
+    if (auto it = wrapped.data_file->upper_bounds.find(field_id);
+        it != wrapped.data_file->upper_bounds.cend() && !it->second.empty()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto upper,
+                              Literal::Deserialize(it->second, 
primitive_type));
+      upper_bounds.emplace(field_id, std::move(upper));
+    }
+  }
+
+  bounds_converted = true;
+  return {};
+}
+
+// Check if an equality delete file can contain deletes for a data file.
+Result<bool> CanContainEqDeletesForFile(const DataFile& data_file,
+                                        const EqualityDeleteFile& delete_file) 
{
+  // Whether to check data ranges or to assume that the ranges match.  If 
upper/lower
+  // bounds are missing, null counts may still be used to determine delete 
files can be
+  // skipped.
+  bool check_ranges = !data_file.lower_bounds.empty() &&
+                      !data_file.upper_bounds.empty() &&
+                      delete_file.HasLowerAndUpperBounds();
+
+  const auto* wrapped_delete_file = delete_file.wrapped.data_file.get();
+
+  for (int32_t field_id : wrapped_delete_file->equality_ids) {
+    ICEBERG_ASSIGN_OR_RAISE(auto found_field,
+                            delete_file.schema->FindFieldById(field_id));
+    if (!found_field.has_value()) {
+      continue;
+    }
+
+    const auto& field = found_field.value().get();
+    if (field.type()->is_nested()) {
+      continue;
+    }
+
+    bool is_required = !field.optional();
+    bool data_contains_null =
+        ContainsNull(data_file.null_value_counts, field_id, is_required);
+    bool delete_contains_null =
+        ContainsNull(wrapped_delete_file->null_value_counts, field_id, 
is_required);
+
+    if (data_contains_null && delete_contains_null) {
+      // Both have nulls - delete may apply
+      continue;
+    }
+
+    if (AllNull(data_file.null_value_counts, data_file.value_counts, field_id,
+                is_required) &&
+        AllNonNull(wrapped_delete_file->null_value_counts, field_id, 
is_required)) {
+      return false;  // Data is all null, delete has no nulls - cannot match
+    }
+
+    if (AllNull(wrapped_delete_file->null_value_counts, 
wrapped_delete_file->value_counts,
+                field_id, is_required) &&
+        AllNonNull(data_file.null_value_counts, field_id, is_required)) {
+      return false;  // Delete is all null, data has no nulls - cannot match
+    }
+
+    if (!check_ranges) {
+      continue;
+    }
+
+    // Check range overlap
+    auto data_lower_it = data_file.lower_bounds.find(field_id);
+    auto data_upper_it = data_file.upper_bounds.find(field_id);
+    if (data_lower_it == data_file.lower_bounds.cend() || 
data_lower_it->second.empty() ||
+        data_upper_it == data_file.upper_bounds.cend() || 
data_upper_it->second.empty()) {
+      continue;  // Missing bounds, assume may match
+    }
+
+    auto delete_lower = delete_file.LowerBound(field_id);
+    auto delete_upper = delete_file.UpperBound(field_id);
+    if (!delete_lower.has_value() || !delete_upper.has_value()) {
+      continue;  // Missing bounds, assume may match
+    }
+
+    // Convert data bounds
+    auto primitive_type = checked_pointer_cast<PrimitiveType>(field.type());
+    ICEBERG_ASSIGN_OR_RAISE(auto data_lower,
+                            Literal::Deserialize(data_lower_it->second, 
primitive_type));
+    ICEBERG_ASSIGN_OR_RAISE(auto data_upper,
+                            Literal::Deserialize(data_upper_it->second, 
primitive_type));
+
+    if (!RangesOverlap(data_lower, data_upper, delete_lower->value().get(),
+                       delete_upper->value().get())) {
+      return false;  // Ranges don't overlap - cannot match
+    }
+  }
+
+  return true;
+}
+
+// PositionDeletes implementation
+
+Status PositionDeletes::Add(ManifestEntry&& entry) {
+  ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+                   "Missing sequence number for position delete: {}",
+                   entry.data_file->file_path);
+  files_.emplace_back(std::move(entry));
+  indexed_ = false;
+  return {};
+}
+
+std::vector<std::shared_ptr<DataFile>> PositionDeletes::Filter(int64_t seq) {
+  IndexIfNeeded();
+
+  auto iter = std::ranges::lower_bound(seqs_, seq);
+  if (iter == seqs_.end()) {
+    return {};
+  }
+  return files_ | std::views::drop(iter - seqs_.begin()) |
+         std::views::transform(&ManifestEntry::data_file) |
+         std::ranges::to<std::vector<std::shared_ptr<DataFile>>>();
+}
+
+std::vector<std::shared_ptr<DataFile>> 
PositionDeletes::ReferencedDeleteFiles() {
+  IndexIfNeeded();
+  return files_ | std::views::transform(&ManifestEntry::data_file) |
+         std::ranges::to<std::vector<std::shared_ptr<DataFile>>>();
+}
+
+void PositionDeletes::IndexIfNeeded() {
+  if (indexed_) {
+    return;
+  }
+
+  // Sort by data sequence number
+  std::ranges::sort(files_, std::ranges::less{}, 
&ManifestEntry::sequence_number);
+
+  // Build sequence number array for binary search
+  seqs_ = files_ |
+          std::views::transform([](const auto& e) { return 
e.sequence_number.value(); }) |
+          std::ranges::to<std::vector<int64_t>>();
+
+  indexed_ = true;
+}
+
+// EqualityDeletes implementation
+
+Status EqualityDeletes::Add(ManifestEntry&& entry) {
+  ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+                   "Missing sequence number for equality delete: {}",
+                   entry.data_file->file_path);
+  files_.emplace_back(&schema_, std::move(entry));
+  indexed_ = false;
+  return {};
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> EqualityDeletes::Filter(
+    int64_t seq, const DataFile& data_file) {
+  IndexIfNeeded();
+
+  auto iter = std::ranges::lower_bound(seqs_, seq);
+  if (iter == seqs_.end()) {
+    return {};
+  }
+  std::vector<std::shared_ptr<DataFile>> result;
+  result.reserve(seqs_.end() - iter);
+  for (auto& delete_file : files_ | std::views::drop(iter - seqs_.begin())) {
+    ICEBERG_ASSIGN_OR_RAISE(bool may_contain,
+                            CanContainEqDeletesForFile(data_file, 
delete_file));
+    if (may_contain) {
+      result.push_back(delete_file.wrapped.data_file);
+    }
+  }
+
+  return result;
+}
+
+std::vector<std::shared_ptr<DataFile>> 
EqualityDeletes::ReferencedDeleteFiles() {
+  IndexIfNeeded();
+  return files_ |
+         std::views::transform([](const auto& f) { return f.wrapped.data_file; 
}) |
+         std::ranges::to<std::vector<std::shared_ptr<DataFile>>>();
+}
+
+void EqualityDeletes::IndexIfNeeded() {
+  if (indexed_) {
+    return;
+  }
+
+  // Sort by apply sequence number
+  std::ranges::sort(files_, std::ranges::less{},
+                    &EqualityDeleteFile::apply_sequence_number);
+
+  // Build sequence number array for binary search
+  seqs_ = files_ | 
std::views::transform(&EqualityDeleteFile::apply_sequence_number) |
+          std::ranges::to<std::vector<int64_t>>();
+
+  indexed_ = true;
+}
+
+}  // namespace internal
+
+// DeleteFileIndex implementation
+
+DeleteFileIndex::DeleteFileIndex(
+    std::unique_ptr<internal::EqualityDeletes> global_deletes,
+    std::unique_ptr<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>
+        eq_deletes_by_partition,
+    std::unique_ptr<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>
+        pos_deletes_by_partition,
+    std::unique_ptr<
+        std::unordered_map<std::string, 
std::unique_ptr<internal::PositionDeletes>>>
+        pos_deletes_by_path,
+    std::unique_ptr<std::unordered_map<std::string, ManifestEntry>> dv_by_path)
+    : global_deletes_(std::move(global_deletes)),
+      eq_deletes_by_partition_(std::move(eq_deletes_by_partition)),
+      pos_deletes_by_partition_(std::move(pos_deletes_by_partition)),
+      pos_deletes_by_path_(std::move(pos_deletes_by_path)),
+      dv_by_path_(std::move(dv_by_path)) {
+  has_eq_deletes_ = (global_deletes_ && !global_deletes_->empty()) ||
+                    (eq_deletes_by_partition_ && 
!eq_deletes_by_partition_->empty());
+  has_pos_deletes_ = (pos_deletes_by_partition_ && 
!pos_deletes_by_partition_->empty()) ||
+                     (pos_deletes_by_path_ && !pos_deletes_by_path_->empty()) 
||
+                     (dv_by_path_ && !dv_by_path_->empty());
+  is_empty_ = !has_eq_deletes_ && !has_pos_deletes_;
+}
+
+DeleteFileIndex::~DeleteFileIndex() = default;
+DeleteFileIndex::DeleteFileIndex(DeleteFileIndex&&) noexcept = default;
+DeleteFileIndex& DeleteFileIndex::operator=(DeleteFileIndex&&) noexcept = 
default;
+
+bool DeleteFileIndex::empty() const { return is_empty_; }
+
+bool DeleteFileIndex::has_equality_deletes() const { return has_eq_deletes_; }
+
+bool DeleteFileIndex::has_position_deletes() const { return has_pos_deletes_; }
+
+std::vector<std::shared_ptr<DataFile>> 
DeleteFileIndex::ReferencedDeleteFiles() const {
+  std::vector<std::shared_ptr<DataFile>> result;
+
+  if (global_deletes_) {
+    auto files = global_deletes_->ReferencedDeleteFiles();
+    std::ranges::move(files, std::back_inserter(result));
+  }
+
+  if (eq_deletes_by_partition_) {
+    for (const auto& [_, deletes] : *eq_deletes_by_partition_) {
+      auto files = deletes->ReferencedDeleteFiles();
+      std::ranges::move(files, std::back_inserter(result));
+    }
+  }
+
+  if (pos_deletes_by_partition_) {
+    for (const auto& [_, deletes] : *pos_deletes_by_partition_) {
+      auto files = deletes->ReferencedDeleteFiles();
+      std::ranges::move(files, std::back_inserter(result));
+    }
+  }
+
+  if (pos_deletes_by_path_) {
+    for (auto& [_, deletes] : *pos_deletes_by_path_) {
+      auto files = deletes->ReferencedDeleteFiles();
+      std::ranges::move(files, std::back_inserter(result));
+    }
+  }
+
+  if (dv_by_path_) {
+    for (const auto& [_, dv] : *dv_by_path_) {
+      result.push_back(dv.data_file);
+    }
+  }
+
+  return result;
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForEntry(
+    const ManifestEntry& entry) const {
+  ICEBERG_PRECHECK(entry.data_file != nullptr, "Manifest entry has null data 
file");
+  ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+                   "Missing sequence number for data file: {}",
+                   entry.data_file->file_path);
+  return ForDataFile(entry.sequence_number.value(), *entry.data_file);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> DeleteFileIndex::ForDataFile(
+    int64_t sequence_number, const DataFile& file) const {
+  if (is_empty_) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto global, FindGlobalDeletes(sequence_number, 
file));
+  ICEBERG_ASSIGN_OR_RAISE(auto eq_partition,
+                          FindEqPartitionDeletes(sequence_number, file));
+  ICEBERG_ASSIGN_OR_RAISE(auto dv, FindDV(sequence_number, file));
+
+  if (dv && global.empty() && eq_partition.empty()) {
+    return std::vector<std::shared_ptr<DataFile>>{std::move(dv)};
+  }
+
+  std::vector<std::shared_ptr<DataFile>> result;
+  result.reserve(global.size() + eq_partition.size() + 1);
+
+  std::ranges::move(global, std::back_inserter(result));
+  std::ranges::move(eq_partition, std::back_inserter(result));
+
+  if (dv) {
+    result.push_back(dv);
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(auto pos_partition,
+                            FindPosPartitionDeletes(sequence_number, file));
+    ICEBERG_ASSIGN_OR_RAISE(auto pos_path, FindPathDeletes(sequence_number, 
file));
+    std::ranges::move(pos_partition, std::back_inserter(result));
+    std::ranges::move(pos_path, std::back_inserter(result));
+  }
+
+  return result;
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> 
DeleteFileIndex::FindGlobalDeletes(
+    int64_t seq, const DataFile& data_file) const {
+  if (!global_deletes_) {
+    return {};
+  }
+  return global_deletes_->Filter(seq, data_file);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> 
DeleteFileIndex::FindEqPartitionDeletes(
+    int64_t seq, const DataFile& data_file) const {
+  if (!eq_deletes_by_partition_) {
+    return {};
+  }
+
+  auto deletes =
+      eq_deletes_by_partition_->get(data_file.partition_spec_id, 
data_file.partition);
+  if (!deletes.has_value()) {
+    return {};
+  }
+  return deletes->get()->Filter(seq, data_file);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> 
DeleteFileIndex::FindPosPartitionDeletes(
+    int64_t seq, const DataFile& data_file) const {
+  if (!pos_deletes_by_partition_) {
+    return {};
+  }
+
+  auto deletes =
+      pos_deletes_by_partition_->get(data_file.partition_spec_id, 
data_file.partition);
+  if (!deletes.has_value()) {
+    return {};
+  }
+
+  return deletes->get()->Filter(seq);
+}
+
+Result<std::vector<std::shared_ptr<DataFile>>> 
DeleteFileIndex::FindPathDeletes(
+    int64_t seq, const DataFile& data_file) const {
+  if (!pos_deletes_by_path_) {
+    return {};
+  }
+
+  auto it = pos_deletes_by_path_->find(data_file.file_path);
+  if (it == pos_deletes_by_path_->end()) {
+    return {};
+  }
+
+  return it->second->Filter(seq);
+}
+
+Result<std::shared_ptr<DataFile>> DeleteFileIndex::FindDV(
+    int64_t seq, const DataFile& data_file) const {
+  if (!dv_by_path_) {
+    return nullptr;
+  }
+
+  auto it = dv_by_path_->find(data_file.file_path);
+  if (it == dv_by_path_->end()) {
+    return nullptr;
+  }
+
+  ICEBERG_CHECK(it->second.sequence_number.value() >= seq,
+                "DV data sequence number {} must be greater than or equal to 
data file "
+                "sequence number {}",
+                it->second.sequence_number.value(), seq);
+
+  return it->second.data_file;
+}
+
+Result<DeleteFileIndex::Builder> DeleteFileIndex::BuilderFor(
+    std::shared_ptr<FileIO> io, std::vector<ManifestFile> delete_manifests) {
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+  return Builder(std::move(io), std::move(delete_manifests));
+}
+
+// Builder implementation
+
+DeleteFileIndex::Builder::Builder(std::shared_ptr<FileIO> io,
+                                  std::vector<ManifestFile> delete_manifests)
+    : io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {}
+
+DeleteFileIndex::Builder::~Builder() = default;
+DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default;
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&) 
noexcept =
+    default;
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById(
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id) {
+  specs_by_id_ = std::move(specs_by_id);
+  return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema(
+    std::shared_ptr<Schema> schema) {
+  schema_ = std::move(schema);
+  return *this;
+}
+
+DeleteFileIndex::Builder& 
DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) {
+  min_sequence_number_ = seq;
+  return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::DataFilter(
+    std::shared_ptr<Expression> filter) {
+  if (data_filter_) {
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(data_filter_,
+                                     And::Make(data_filter_, 
std::move(filter)));
+  } else {
+    data_filter_ = std::move(filter);
+  }
+  return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::PartitionFilter(
+    std::shared_ptr<Expression> filter) {
+  if (partition_filter_) {
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(partition_filter_,
+                                     And::Make(partition_filter_, 
std::move(filter)));
+  } else {
+    partition_filter_ = std::move(filter);
+  }
+  return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::FilterPartitions(
+    std::shared_ptr<PartitionSet> partition_set) {
+  partition_set_ = std::move(partition_set);
+  return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::CaseSensitive(bool 
case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  return *this;
+}
+
+DeleteFileIndex::Builder& DeleteFileIndex::Builder::IgnoreResiduals() {
+  ignore_residuals_ = true;
+  return *this;
+}
+
+Result<std::vector<ManifestEntry>> DeleteFileIndex::Builder::LoadDeleteFiles() 
{
+  // Build expression caches per spec ID
+  std::unordered_map<int32_t, std::shared_ptr<Expression>> part_expr_cache;
+  std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;
+
+  auto data_filter = ignore_residuals_ ? True::Instance() : data_filter_;
+
+  // Filter and read manifests into manifest entries
+  std::vector<ManifestEntry> files;
+  for (const auto& manifest : delete_manifests_) {
+    if (manifest.content != ManifestContent::kDeletes) {
+      continue;
+    }
+    if (!manifest.has_added_files() && !manifest.has_existing_files()) {
+      continue;
+    }
+
+    const int32_t spec_id = manifest.partition_spec_id;
+    auto spec_iter = specs_by_id_.find(spec_id);
+    ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+                  "Partition spec ID {} not found when loading delete files", 
spec_id);
+
+    const auto& spec = spec_iter->second;
+
+    // Get or compute projected partition expression
+    if (!part_expr_cache.contains(spec_id) && data_filter_) {
+      auto projector = Projections::Inclusive(*spec, *schema_, 
case_sensitive_);
+      ICEBERG_ASSIGN_OR_RAISE(auto projected, 
projector->Project(data_filter_));
+      part_expr_cache[spec_id] = std::move(projected);
+    }
+
+    // Get or create manifest evaluator
+    if (!eval_cache.contains(spec_id)) {
+      auto filter = partition_filter_;
+      if (auto it = part_expr_cache.find(spec_id); it != 
part_expr_cache.cend()) {
+        if (filter) {
+          ICEBERG_ASSIGN_OR_RAISE(filter, And::Make(filter, it->second));
+        } else {
+          filter = it->second;
+        }
+      }
+      if (filter) {
+        ICEBERG_ASSIGN_OR_RAISE(auto evaluator,
+                                ManifestEvaluator::MakePartitionFilter(
+                                    std::move(filter), spec, *schema_, 
case_sensitive_));
+        eval_cache[spec_id] = std::move(evaluator);
+      }
+    }
+
+    // Evaluate manifest against filter
+    if (auto it = eval_cache.find(spec_id); it != eval_cache.end()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto should_match, 
it->second->Evaluate(manifest));
+      if (!should_match) {
+        continue;  // Manifest doesn't match filter
+      }
+    }
+
+    // Read manifest entries
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io_, schema_, 
spec));
+
+    auto partition_filter = partition_filter_;
+    if (auto it = part_expr_cache.find(spec_id); it != part_expr_cache.cend()) 
{
+      if (partition_filter) {
+        ICEBERG_ASSIGN_OR_RAISE(partition_filter,
+                                And::Make(partition_filter, it->second));
+      } else {
+        partition_filter = it->second;
+      }
+    }
+    if (partition_filter) {
+      reader->FilterPartitions(std::move(partition_filter));
+    }
+    if (partition_set_) {
+      reader->FilterPartitions(partition_set_);
+    }
+    
reader->FilterRows(data_filter).CaseSensitive(case_sensitive_).TryDropStats();
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
+    files.reserve(files.size() + entries.size());
+
+    for (auto& entry : entries) {
+      ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a 
data file");
+      ICEBERG_CHECK(entry.sequence_number.has_value(),
+                    "Missing sequence number for delete file: {}",
+                    entry.data_file->file_path);
+      if (entry.sequence_number.value() > min_sequence_number_) {
+        auto& file = *entry.data_file;
+        // keep minimum stats to avoid memory pressure
+        std::unordered_set<int32_t> columns =
+            file.content == DataFile::Content::kPositionDeletes
+                ? 
std::unordered_set<int32_t>{MetadataColumns::kDeleteFilePathColumnId}
+                : std::unordered_set<int32_t>(file.equality_ids.begin(),
+                                              file.equality_ids.end());
+        ContentFileUtil::DropUnselectedStats(*entry.data_file, columns);
+        files.emplace_back(std::move(entry));
+      }
+    }
+  }
+
+  return files;
+}
+
+Status DeleteFileIndex::Builder::AddDV(
+    std::unordered_map<std::string, ManifestEntry>& dv_by_path, 
ManifestEntry&& entry) {
+  ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data 
file");
+  ICEBERG_PRECHECK(entry.sequence_number.has_value(), "Missing sequence number 
for DV {}",
+                   entry.data_file->file_path);
+
+  const auto& path = entry.data_file->referenced_data_file;
+  ICEBERG_PRECHECK(path.has_value(), "DV must have a referenced data file");
+
+  std::string referenced_path = path.value();
+  auto [it, inserted] = dv_by_path.emplace(referenced_path, std::move(entry));
+  if (!inserted) {
+    return ValidationFailed("Can't index multiple DVs for {}", 
referenced_path);
+  }
+  return {};
+}
+
+Status DeleteFileIndex::Builder::AddPositionDelete(
+    std::unordered_map<std::string, 
std::unique_ptr<internal::PositionDeletes>>&
+        deletes_by_path,
+    PartitionMap<std::unique_ptr<internal::PositionDeletes>>& 
deletes_by_partition,
+    ManifestEntry&& entry) {
+  ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data 
file");
+  ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+                   "Missing sequence number for position delete {}",
+                   entry.data_file->file_path);
+
+  ICEBERG_ASSIGN_OR_RAISE(auto referenced_path,
+                          
ContentFileUtil::ReferencedDataFile(*entry.data_file));
+
+  if (referenced_path.has_value()) {
+    // File-scoped position delete
+    auto& deletes = deletes_by_path[referenced_path.value()];
+    if (!deletes) {
+      deletes = std::make_unique<internal::PositionDeletes>();
+    }
+    ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
+  } else {
+    // Partition-scoped position delete
+    int32_t spec_id = entry.data_file->partition_spec_id;
+    const auto& partition = entry.data_file->partition;
+
+    auto existing = deletes_by_partition.get(spec_id, partition);
+    if (existing.has_value()) {
+      ICEBERG_RETURN_UNEXPECTED(existing->get()->Add(std::move(entry)));
+    } else {
+      auto deletes = std::make_unique<internal::PositionDeletes>();
+      ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
+      deletes_by_partition.put(spec_id, partition, std::move(deletes));
+    }
+  }
+
+  return {};
+}
+
+Status DeleteFileIndex::Builder::AddEqualityDelete(
+    internal::EqualityDeletes& global_deletes,
+    PartitionMap<std::unique_ptr<internal::EqualityDeletes>>& 
deletes_by_partition,
+    ManifestEntry&& entry) {
+  ICEBERG_PRECHECK(entry.data_file != nullptr, "ManifestEntry must have a data 
file");
+  ICEBERG_PRECHECK(entry.sequence_number.has_value(),
+                   "Missing sequence number for equality delete {}",
+                   entry.data_file->file_path);
+
+  int32_t spec_id = entry.data_file->partition_spec_id;
+
+  auto spec_it = specs_by_id_.find(spec_id);
+  if (spec_it == specs_by_id_.end()) {
+    return InvalidArgument("Unknown partition spec ID: {}", spec_id);
+  }
+  const auto& spec = spec_it->second;
+
+  if (spec->fields().empty()) {
+    // Global equality delete for unpartitioned tables
+    ICEBERG_RETURN_UNEXPECTED(global_deletes.Add(std::move(entry)));
+  } else {
+    // Partition-scoped equality delete
+    const auto& partition = entry.data_file->partition;
+
+    auto existing = deletes_by_partition.get(spec_id, partition);
+    if (existing.has_value()) {
+      ICEBERG_RETURN_UNEXPECTED(existing->get()->Add(std::move(entry)));
+    } else {
+      auto deletes = std::make_unique<internal::EqualityDeletes>(*schema_);
+      ICEBERG_RETURN_UNEXPECTED(deletes->Add(std::move(entry)));
+      deletes_by_partition.put(spec_id, partition, std::move(deletes));
+    }
+  }
+
+  return {};
+}
+
+Result<std::unique_ptr<DeleteFileIndex>> DeleteFileIndex::Builder::Build() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+  ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files");
+  ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete 
files");
+  ICEBERG_PRECHECK(!specs_by_id_.empty(),
+                   "Partition specs are required to load delete files");
+
+  std::vector<ManifestEntry> entries;
+  if (!delete_manifests_.empty()) {
+    ICEBERG_ASSIGN_OR_RAISE(entries, LoadDeleteFiles());
+  }
+
+  // Build index structures
+  auto global_deletes = std::make_unique<internal::EqualityDeletes>(*schema_);
+  auto eq_deletes_by_partition =
+      
std::make_unique<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>();
+  auto pos_deletes_by_partition =
+      
std::make_unique<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>();
+  auto pos_deletes_by_path = std::make_unique<
+      std::unordered_map<std::string, 
std::unique_ptr<internal::PositionDeletes>>>();
+  auto dv_by_path = std::make_unique<std::unordered_map<std::string, 
ManifestEntry>>();
+
+  for (auto& entry : entries) {
+    ICEBERG_CHECK(entry.data_file != nullptr, "ManifestEntry must have a data 
file");
+
+    switch (entry.data_file->content) {
+      case DataFile::Content::kPositionDeletes:
+        if (ContentFileUtil::IsDV(*entry.data_file)) {
+          ICEBERG_RETURN_UNEXPECTED(AddDV(*dv_by_path, std::move(entry)));
+        } else {
+          ICEBERG_RETURN_UNEXPECTED(AddPositionDelete(
+              *pos_deletes_by_path, *pos_deletes_by_partition, 
std::move(entry)));
+        }
+        break;
+
+      case DataFile::Content::kEqualityDeletes:
+        ICEBERG_RETURN_UNEXPECTED(AddEqualityDelete(
+            *global_deletes, *eq_deletes_by_partition, std::move(entry)));
+        break;
+
+      default:
+        return NotSupported("Unsupported content type: {}",
+                            static_cast<int>(entry.data_file->content));
+    }
+  }
+
+  return std::unique_ptr<DeleteFileIndex>(new DeleteFileIndex(
+      global_deletes->empty() ? nullptr : std::move(global_deletes),
+      eq_deletes_by_partition->empty() ? nullptr : 
std::move(eq_deletes_by_partition),
+      pos_deletes_by_partition->empty() ? nullptr : 
std::move(pos_deletes_by_partition),
+      pos_deletes_by_path->empty() ? nullptr : std::move(pos_deletes_by_path),
+      dv_by_path->empty() ? nullptr : std::move(dv_by_path)));
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h
new file mode 100644
index 00000000..03cc2661
--- /dev/null
+++ b/src/iceberg/delete_file_index.h
@@ -0,0 +1,398 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/delete_file_index.h
+/// An index of delete files by sequence number.
+
+#include <functional>
+#include <map>
+#include <memory>
+#include <optional>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/expression/literal.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+#include "iceberg/util/partition_value_util.h"
+
+namespace iceberg {
+
+namespace internal {
+
+/// \brief Wrapper for equality delete files that caches converted bounds.
+struct ICEBERG_EXPORT EqualityDeleteFile {
+  const Schema* schema;
+  ManifestEntry wrapped;
+  int64_t apply_sequence_number;  // = data_sequence_number - 1
+
+  // Lazily converted bounds for pruning
+  mutable std::unordered_map<int32_t, Literal> lower_bounds;
+  mutable std::unordered_map<int32_t, Literal> upper_bounds;
+  mutable bool bounds_converted = false;
+
+  EqualityDeleteFile(const Schema* schema, ManifestEntry&& entry)
+      : schema(schema),
+        wrapped(std::move(entry)),
+        apply_sequence_number(wrapped.sequence_number.value() - 1) {}
+
+  /// \brief Check if this delete file has both lower and upper bounds.
+  bool HasLowerAndUpperBounds() const {
+    return !wrapped.data_file->lower_bounds.empty() &&
+           !wrapped.data_file->upper_bounds.empty();
+  }
+
+  /// \brief Get the lower bound for a field ID.
+  Result<std::optional<std::reference_wrapper<const Literal>>> LowerBound(
+      int32_t id) const {
+    ICEBERG_RETURN_UNEXPECTED(ConvertBoundsIfNeeded());
+    auto it = lower_bounds.find(id);
+    return it != lower_bounds.cend() ? 
std::make_optional(std::cref(it->second))
+                                     : std::nullopt;
+  }
+
+  /// \brief Get the upper bound for a field ID.
+  Result<std::optional<std::reference_wrapper<const Literal>>> UpperBound(
+      int32_t id) const {
+    ICEBERG_RETURN_UNEXPECTED(ConvertBoundsIfNeeded());
+    auto it = upper_bounds.find(id);
+    return it != upper_bounds.cend() ? 
std::make_optional(std::cref(it->second))
+                                     : std::nullopt;
+  }
+
+ private:
+  /// \brief Convert bounds from binary to Literal. Implemented in .cc file.
+  Status ConvertBoundsIfNeeded() const;
+};
+
+/// \brief Check if two ranges overlap.
+inline bool RangesOverlap(const Literal& data_lower, const Literal& data_upper,
+                          const Literal& delete_lower, const Literal& 
delete_upper) {
+  if (data_lower > delete_upper) {
+    return false;
+  }
+  if (delete_lower > data_upper) {
+    return false;
+  }
+  return true;
+}
+
+/// \brief Check if a value count map indicates all values are null.
+inline bool AllNull(const std::map<int32_t, int64_t>& null_counts,
+                    const std::map<int32_t, int64_t>& value_counts, int32_t 
field_id,
+                    bool is_required) {
+  if (is_required) {
+    return false;
+  }
+
+  auto null_it = null_counts.find(field_id);
+  auto value_it = value_counts.find(field_id);
+  if (null_it == null_counts.cend() || value_it == value_counts.cend()) {
+    return false;
+  }
+
+  return null_it->second == value_it->second;
+}
+
+/// \brief Check if all values are non-null.
+inline bool AllNonNull(const std::map<int32_t, int64_t>& null_counts, int32_t 
field_id,
+                       bool is_required) {
+  if (is_required) {
+    return true;
+  }
+
+  auto it = null_counts.find(field_id);
+  if (it == null_counts.cend()) {
+    return false;
+  }
+
+  return it->second <= 0;
+}
+
+/// \brief Check if the column contains any null values.
+inline bool ContainsNull(const std::map<int32_t, int64_t>& null_counts, 
int32_t field_id,
+                         bool is_required) {
+  if (is_required) {
+    return false;
+  }
+
+  auto it = null_counts.find(field_id);
+  if (it == null_counts.cend()) {
+    return true;  // Unknown, assume may contain null
+  }
+
+  return it->second > 0;
+}
+
+/// \brief Check if an equality delete file can contain deletes for a data 
file.
+ICEBERG_EXPORT Result<bool> CanContainEqDeletesForFile(
+    const DataFile& data_file, const EqualityDeleteFile& delete_file);
+
+/// \brief A group of position delete files sorted by the sequence number they 
apply to.
+///
+/// Position delete files apply to data files with a sequence number <= the 
delete
+/// file's data sequence number.
+class ICEBERG_EXPORT PositionDeletes {
+ public:
+  PositionDeletes() = default;
+
+  /// \brief Add a position delete file to this group.
+  [[nodiscard]] Status Add(ManifestEntry&& entry);
+
+  /// \brief Returns all delete files with data_sequence_number >= the given 
sequence
+  /// number.
+  std::vector<std::shared_ptr<DataFile>> Filter(int64_t seq);
+
+  /// \brief Get all delete files in this group.
+  std::vector<std::shared_ptr<DataFile>> ReferencedDeleteFiles();
+
+  /// \brief Check if this group is empty.
+  bool empty() const { return files_.empty(); }
+
+ private:
+  void IndexIfNeeded();
+
+  std::vector<ManifestEntry> files_;
+  std::vector<int64_t> seqs_;
+  bool indexed_ = false;
+};
+
+/// \brief A group of equality delete files sorted by apply sequence number.
+///
+/// Equality deletes apply to data files with sequence number < the delete's
+/// data sequence number (i.e., apply_sequence_number = data_sequence_number - 
1).
+class ICEBERG_EXPORT EqualityDeletes {
+ public:
+  explicit EqualityDeletes(const Schema& schema) : schema_(schema) {}
+
+  /// \brief Add an equality delete file to this group.
+  [[nodiscard]] Status Add(ManifestEntry&& entry);
+
+  /// \brief Filter equality deletes that apply to the given data file.
+  ///
+  /// Returns delete files where:
+  /// 1. apply_sequence_number >= the data file's sequence number
+  /// 2. The delete file's bounds may overlap with the data file
+  Result<std::vector<std::shared_ptr<DataFile>>> Filter(int64_t seq,
+                                                        const DataFile& 
data_file);
+
+  /// \brief Get all delete files in this group.
+  std::vector<std::shared_ptr<DataFile>> ReferencedDeleteFiles();
+
+  /// \brief Check if this group is empty.
+  bool empty() const { return files_.empty(); }
+
+ private:
+  void IndexIfNeeded();
+
+  const Schema& schema_;
+  std::vector<EqualityDeleteFile> files_;
+  std::vector<int64_t> seqs_;
+  bool indexed_ = false;
+};
+
+}  // namespace internal
+
+/// \brief An index of delete files by sequence number.
+///
+/// Use `DeleteFileIndex::Builder` to construct an index, and `ForDataFile()`
+/// or `ForEntry()` to get the delete files to apply to a given data file.
+///
+/// The index organizes delete files by:
+/// - Global equality deletes (apply to all partitions)
+/// - Partitioned equality deletes (apply to specific partitions)
+/// - Partitioned position deletes (apply to specific partitions)
+/// - File-scoped position deletes (apply to specific data files)
+/// - Deletion vectors (DVs) that reference specific data files
+class ICEBERG_EXPORT DeleteFileIndex {
+ public:
+  class Builder;
+
+  ~DeleteFileIndex();
+
+  DeleteFileIndex(DeleteFileIndex&&) noexcept;
+  DeleteFileIndex& operator=(DeleteFileIndex&&) noexcept;
+  DeleteFileIndex(const DeleteFileIndex&) = delete;
+  DeleteFileIndex& operator=(const DeleteFileIndex&) = delete;
+
+  /// \brief Check if this index is empty (has no delete files).
+  bool empty() const;
+
+  /// \brief Check if this index has any equality delete files.
+  bool has_equality_deletes() const;
+
+  /// \brief Check if this index has any position delete files.
+  bool has_position_deletes() const;
+
+  /// \brief Get all delete files referenced by this index.
+  /// TODO(gangwu): use lazy iterator to avoid large memory allocation.
+  std::vector<std::shared_ptr<DataFile>> ReferencedDeleteFiles() const;
+
+  /// \brief Get the delete files that apply to a manifest entry.
+  ///
+  /// \param entry The manifest entry to find delete files for
+  /// \return Delete files that should be applied when reading the data file
+  Result<std::vector<std::shared_ptr<DataFile>>> ForEntry(
+      const ManifestEntry& entry) const;
+
+  /// \brief Get the delete files that apply to a data file with a specific 
sequence
+  /// number.
+  ///
+  /// \param sequence_number The data sequence number of the data file
+  /// \param file The data file to find delete files for
+  /// \return Delete files that should be applied when reading the data file
+  Result<std::vector<std::shared_ptr<DataFile>>> ForDataFile(int64_t 
sequence_number,
+                                                             const DataFile& 
file) const;
+
+  /// \brief Create a builder for constructing a DeleteFileIndex from manifest 
files.
+  ///
+  /// \param io The FileIO to use for reading manifests
+  /// \param delete_manifests The delete manifests to index
+  /// \return A Builder instance
+  static Result<Builder> BuilderFor(std::shared_ptr<FileIO> io,
+                                    std::vector<ManifestFile> 
delete_manifests);
+
+ private:
+  friend class Builder;
+
+  // Private constructor used by Builder
+  DeleteFileIndex(
+      std::unique_ptr<internal::EqualityDeletes> global_deletes,
+      std::unique_ptr<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>
+          eq_deletes_by_partition,
+      std::unique_ptr<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>
+          pos_deletes_by_partition,
+      std::unique_ptr<
+          std::unordered_map<std::string, 
std::unique_ptr<internal::PositionDeletes>>>
+          pos_deletes_by_path,
+      std::unique_ptr<std::unordered_map<std::string, ManifestEntry>> 
dv_by_path);
+
+  // Helper methods for finding delete files
+  Result<std::vector<std::shared_ptr<DataFile>>> FindGlobalDeletes(
+      int64_t seq, const DataFile& data_file) const;
+  Result<std::vector<std::shared_ptr<DataFile>>> FindEqPartitionDeletes(
+      int64_t seq, const DataFile& data_file) const;
+  Result<std::vector<std::shared_ptr<DataFile>>> FindPosPartitionDeletes(
+      int64_t seq, const DataFile& data_file) const;
+  Result<std::vector<std::shared_ptr<DataFile>>> FindPathDeletes(
+      int64_t seq, const DataFile& data_file) const;
+  Result<std::shared_ptr<DataFile>> FindDV(int64_t seq, const DataFile& 
data_file) const;
+
+  // Index data structures
+  std::unique_ptr<internal::EqualityDeletes> global_deletes_;
+  std::unique_ptr<PartitionMap<std::unique_ptr<internal::EqualityDeletes>>>
+      eq_deletes_by_partition_;
+  std::unique_ptr<PartitionMap<std::unique_ptr<internal::PositionDeletes>>>
+      pos_deletes_by_partition_;
+  std::unique_ptr<
+      std::unordered_map<std::string, 
std::unique_ptr<internal::PositionDeletes>>>
+      pos_deletes_by_path_;
+  std::unique_ptr<std::unordered_map<std::string, ManifestEntry>> dv_by_path_;
+
+  bool has_eq_deletes_ = false;
+  bool has_pos_deletes_ = false;
+  bool is_empty_ = true;
+};
+
+class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
+ public:
+  /// \brief Construct a builder from manifest files.
+  Builder(std::shared_ptr<FileIO> io, std::vector<ManifestFile> 
delete_manifests);
+
+  ~Builder() override;
+
+  Builder(Builder&&) noexcept;
+  Builder& operator=(Builder&&) noexcept;
+  Builder(const Builder&) = delete;
+  Builder& operator=(const Builder&) = delete;
+
+  /// \brief Set the partition specs by ID.
+  Builder& SpecsById(
+      std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id);
+
+  /// \brief Set the table schema.
+  ///
+  /// Required for filtering and expression evaluation.
+  Builder& WithSchema(std::shared_ptr<Schema> schema);
+
+  /// \brief Set the minimum sequence number for delete files.
+  ///
+  /// Only delete files with sequence number > min_sequence_number will be 
included.
+  Builder& AfterSequenceNumber(int64_t seq);
+
+  /// \brief Set a row-level data filter.
+  ///
+  /// This filter is projected to partition expressions for manifest pruning 
and
+  /// then residuals are applied to data files.
+  Builder& DataFilter(std::shared_ptr<Expression> filter);
+
+  /// \brief Set a partition filter expression.
+  Builder& PartitionFilter(std::shared_ptr<Expression> filter);
+
+  /// \brief Set a partition set to filter manifests.
+  Builder& FilterPartitions(std::shared_ptr<PartitionSet> partition_set);
+
+  /// \brief Set case sensitivity for column name matching.
+  Builder& CaseSensitive(bool case_sensitive);
+
+  /// \brief Ignore residual expressions after partition filtering.
+  Builder& IgnoreResiduals();
+
+  /// \brief Build the DeleteFileIndex.
+  Result<std::unique_ptr<DeleteFileIndex>> Build();
+
+ private:
+  // Load delete files from manifests
+  Result<std::vector<ManifestEntry>> LoadDeleteFiles();
+
+  // Add a DV to the index
+  Status AddDV(std::unordered_map<std::string, ManifestEntry>& dv_by_path,
+               ManifestEntry&& entry);
+
+  // Add a position delete file to the index
+  Status AddPositionDelete(
+      std::unordered_map<std::string, 
std::unique_ptr<internal::PositionDeletes>>&
+          deletes_by_path,
+      PartitionMap<std::unique_ptr<internal::PositionDeletes>>& 
deletes_by_partition,
+      ManifestEntry&& entry);
+
+  // Add an equality delete file to the index
+  Status AddEqualityDelete(
+      internal::EqualityDeletes& global_deletes,
+      PartitionMap<std::unique_ptr<internal::EqualityDeletes>>& 
deletes_by_partition,
+      ManifestEntry&& entry);
+
+  std::shared_ptr<FileIO> io_;
+  std::vector<ManifestFile> delete_manifests_;
+  int64_t min_sequence_number_ = 0;
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<Expression> data_filter_;
+  std::shared_ptr<Expression> partition_filter_;
+  std::shared_ptr<PartitionSet> partition_set_;
+  bool case_sensitive_ = true;
+  bool ignore_residuals_ = false;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_reader.cc 
b/src/iceberg/manifest/manifest_reader.cc
index ba1ff860..5f50d045 100644
--- a/src/iceberg/manifest/manifest_reader.cc
+++ b/src/iceberg/manifest/manifest_reader.cc
@@ -39,6 +39,7 @@
 #include "iceberg/schema_field.h"
 #include "iceberg/type.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/content_file_util.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
@@ -557,9 +558,9 @@ Result<std::vector<ManifestEntry>> ParseManifestEntry(
   return manifest_entries;
 }
 
-const std::vector<std::string> kStatsColumns = {"value_counts",     
"null_value_counts",
-                                                "nan_value_counts", 
"lower_bounds",
-                                                "upper_bounds",     
"record_count"};
+const std::vector<std::string> kStatsColumns = {
+    "value_counts", "null_value_counts", "nan_value_counts", "lower_bounds",
+    "upper_bounds", "column_sizes",      "record_count"};
 
 bool RequireStatsProjection(const std::shared_ptr<Expression>& row_filter,
                             const std::vector<std::string>& columns) {
@@ -592,6 +593,29 @@ Result<std::shared_ptr<Schema>> 
ProjectSchema(std::shared_ptr<Schema> schema,
 
 }  // namespace
 
+bool ManifestReader::ShouldDropStats(const std::vector<std::string>& columns) {
+  // Make sure we only drop all stats if we had projected all stats.
+  // We do not drop stats even if we had partially added some stats columns, 
except for
+  // record_count column.
+  // Since we don't want to keep stats map which could be huge in size just 
because we
+  // select record_count, which is a primitive type.
+  if (!columns.empty()) {
+    const std::unordered_set<std::string_view> selected(columns.cbegin(), 
columns.cend());
+    if (selected.contains(Schema::kAllColumns)) {
+      return false;
+    }
+    std::unordered_set<std::string_view> intersection;
+    for (const auto& col : kStatsColumns) {
+      if (selected.contains(col)) {
+        intersection.insert(col);
+      }
+    }
+    return intersection.empty() ||
+           (intersection.size() == 1 && intersection.contains("record_count"));
+  }
+  return false;
+}
+
 std::vector<std::string> ManifestReader::WithStatsColumns(
     const std::vector<std::string>& columns) {
   if (std::ranges::contains(columns, Schema::kAllColumns)) {
@@ -645,6 +669,11 @@ ManifestReader& ManifestReaderImpl::CaseSensitive(bool 
case_sensitive) {
   return *this;
 }
 
+ManifestReader& ManifestReaderImpl::TryDropStats() {
+  drop_stats_ = true;
+  return *this;
+}
+
 bool ManifestReaderImpl::HasPartitionFilter() const {
   ICEBERG_DCHECK(part_filter_, "Partition filter is not set");
   return part_filter_->op() != Expression::Operation::kTrue;
@@ -773,6 +802,8 @@ Result<std::vector<ManifestEntry>> 
ManifestReaderImpl::ReadEntries(bool only_liv
     ICEBERG_ASSIGN_OR_RAISE(metrics_evaluator, GetMetricsEvaluator());
   }
 
+  bool drop_stats = drop_stats_ && ShouldDropStats(columns_);
+
   while (true) {
     ICEBERG_ASSIGN_OR_RAISE(auto result, file_reader_->Next());
     if (!result.has_value()) {
@@ -812,6 +843,10 @@ Result<std::vector<ManifestEntry>> 
ManifestReaderImpl::ReadEntries(bool only_liv
         }
       }
 
+      if (drop_stats) {
+        ContentFileUtil::DropAllStats(*entry.data_file);
+      }
+
       manifest_entries.push_back(std::move(entry));
     }
   }
diff --git a/src/iceberg/manifest/manifest_reader.h 
b/src/iceberg/manifest/manifest_reader.h
index 748d0e33..ddfefc57 100644
--- a/src/iceberg/manifest/manifest_reader.h
+++ b/src/iceberg/manifest/manifest_reader.h
@@ -71,6 +71,15 @@ class ICEBERG_EXPORT ManifestReader {
   /// \brief Set case sensitivity for column name matching.
   virtual ManifestReader& CaseSensitive(bool case_sensitive) = 0;
 
+  /// \brief Try to drop stats from returned DataFile objects.
+  virtual ManifestReader& TryDropStats() = 0;
+
+  /// \brief Determine whether stats should be dropped based on selected 
columns.
+  ///
+  /// Returns true if the selected columns do not include any stats columns, 
or only
+  /// include record_count (which is a primitive, not a large map).
+  static bool ShouldDropStats(const std::vector<std::string>& columns);
+
   /// \brief Creates a reader for a manifest file.
   /// \param manifest A ManifestFile object containing metadata about the 
manifest.
   /// \param file_io File IO implementation to use.
diff --git a/src/iceberg/manifest/manifest_reader_internal.h 
b/src/iceberg/manifest/manifest_reader_internal.h
index ba804ffb..42263808 100644
--- a/src/iceberg/manifest/manifest_reader_internal.h
+++ b/src/iceberg/manifest/manifest_reader_internal.h
@@ -74,6 +74,8 @@ class ManifestReaderImpl : public ManifestReader {
 
   ManifestReader& CaseSensitive(bool case_sensitive) override;
 
+  ManifestReader& TryDropStats() override;
+
  private:
   /// \brief Read entries with optional live-only filtering.
   Result<std::vector<ManifestEntry>> ReadEntries(bool only_live);
@@ -111,6 +113,7 @@ class ManifestReaderImpl : public ManifestReader {
   std::shared_ptr<Expression> row_filter_{True::Instance()};
   std::shared_ptr<PartitionSet> partition_set_;
   bool case_sensitive_{true};
+  bool drop_stats_{false};
 
   // Lazy fields
   std::unique_ptr<Reader> file_reader_;
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 8327ca2e..f8022b09 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -42,6 +42,7 @@ iceberg_include_dir = include_directories('..')
 iceberg_sources = files(
     'arrow_c_data_guard_internal.cc',
     'catalog/memory/in_memory_catalog.cc',
+    'delete_file_index.cc',
     'expression/aggregate.cc',
     'expression/binder.cc',
     'expression/evaluator.cc',
@@ -102,6 +103,7 @@ iceberg_sources = files(
     'update/update_properties.cc',
     'update/update_sort_order.cc',
     'util/bucket_util.cc',
+    'util/content_file_util.cc',
     'util/conversions.cc',
     'util/decimal.cc',
     'util/gzip_internal.cc',
@@ -165,6 +167,7 @@ install_headers(
         'arrow_c_data.h',
         'catalog.h',
         'constants.h',
+        'delete_file_index.h',
         'exception.h',
         'file_format.h',
         'file_io.h',
diff --git a/src/iceberg/metadata_columns.h b/src/iceberg/metadata_columns.h
index 18aeb2b8..61f07c48 100644
--- a/src/iceberg/metadata_columns.h
+++ b/src/iceberg/metadata_columns.h
@@ -39,19 +39,22 @@ struct ICEBERG_EXPORT MetadataColumns {
   constexpr static int32_t kInt32Max = std::numeric_limits<int32_t>::max();
 
   // IDs kInt32Max - (1-100) are used for metadata columns
-  inline static const SchemaField kFilePath =
-      SchemaField::MakeRequired(kInt32Max - 1, "_file", iceberg::string(),
-                                "Path of the file in which a row is stored");
+  constexpr static int32_t kFilePathColumnId = kInt32Max - 1;
+  inline static const SchemaField kFilePath = SchemaField::MakeRequired(
+      kFilePathColumnId, "_file", string(), "Path of the file in which a row 
is stored");
 
+  constexpr static int32_t kFilePositionColumnId = kInt32Max - 2;
   inline static const SchemaField kRowPosition =
-      SchemaField::MakeRequired(kInt32Max - 2, "_pos", iceberg::int64(),
+      SchemaField::MakeRequired(kFilePositionColumnId, "_pos", int64(),
                                 "Ordinal position of a row in the source data 
file");
 
+  constexpr static int32_t kIsDeletedColumnId = kInt32Max - 3;
   inline static const SchemaField kIsDeleted = SchemaField::MakeRequired(
-      kInt32Max - 3, "_deleted", iceberg::binary(), "Whether the row has been 
deleted");
+      kIsDeletedColumnId, "_deleted", binary(), "Whether the row has been 
deleted");
 
+  constexpr static int32_t kSpecIdColumnId = kInt32Max - 4;
   inline static const SchemaField kSpecId =
-      SchemaField::MakeRequired(kInt32Max - 4, "_spec_id", iceberg::int32(),
+      SchemaField::MakeRequired(kSpecIdColumnId, "_spec_id", int32(),
                                 "Spec ID used to track the file containing a 
row");
 
   // The partition column type depends on all specs in the table
@@ -64,35 +67,41 @@ struct ICEBERG_EXPORT MetadataColumns {
   constexpr static int32_t kContentSizeInBytesColumnId = kInt32Max - 7;
 
   // IDs kInt32Max - (101-200) are used for reserved columns
+  constexpr static int32_t kDeleteFilePathColumnId = kInt32Max - 101;
   inline static const SchemaField kDeleteFilePath =
-      SchemaField::MakeRequired(kInt32Max - 101, "file_path", 
iceberg::string(),
+      SchemaField::MakeRequired(kDeleteFilePathColumnId, "file_path", string(),
                                 "Path of a file in which a deleted row is 
stored");
 
+  constexpr static int32_t kDeleteFilePosColumnId = kInt32Max - 102;
   inline static const SchemaField kDeleteFilePos =
-      SchemaField::MakeRequired(kInt32Max - 102, "pos", iceberg::int64(),
+      SchemaField::MakeRequired(kDeleteFilePosColumnId, "pos", int64(),
                                 "Ordinal position of a deleted row in the data 
file");
 
   // The row column type depends on the table schema
-  constexpr static int32_t kDeleteFileRowFieldId = kInt32Max - 103;
+  constexpr static int32_t kDeleteFileRowColumnId = kInt32Max - 103;
   constexpr static std::string_view kDeleteFileRowFieldName = "row";
   constexpr static std::string_view kDeleteFileRowDoc = "Deleted row values";
 
+  constexpr static int32_t kChangeTypeColumnId = kInt32Max - 104;
   inline static const SchemaField kChangeType = SchemaField::MakeRequired(
-      kInt32Max - 104, "_change_type", iceberg::string(), "Record type in 
changelog");
+      kChangeTypeColumnId, "_change_type", string(), "Record type in 
changelog");
 
-  inline static const SchemaField kChangeOrdinal =
-      SchemaField::MakeOptional(kInt32Max - 105, "_change_ordinal", 
iceberg::int32(),
-                                "Change ordinal in changelog");
+  constexpr static int32_t kChangeOrdinalColumnId = kInt32Max - 105;
+  inline static const SchemaField kChangeOrdinal = SchemaField::MakeOptional(
+      kChangeOrdinalColumnId, "_change_ordinal", int32(), "Change ordinal in 
changelog");
 
+  constexpr static int32_t kCommitSnapshotIdColumnId = kInt32Max - 106;
   inline static const SchemaField kCommitSnapshotId = 
SchemaField::MakeOptional(
-      kInt32Max - 106, "_commit_snapshot_id", iceberg::int64(), "Commit 
snapshot ID");
+      kCommitSnapshotIdColumnId, "_commit_snapshot_id", int64(), "Commit 
snapshot ID");
 
+  constexpr static int32_t kRowIdColumnId = kInt32Max - 107;
   inline static const SchemaField kRowId =
-      SchemaField::MakeOptional(kInt32Max - 107, "_row_id", iceberg::int64(),
+      SchemaField::MakeOptional(kRowIdColumnId, "_row_id", int64(),
                                 "Implicit row ID that is automatically 
assigned");
 
+  constexpr static int32_t kLastUpdatedSequenceNumberColumnId = kInt32Max - 
108;
   inline static const SchemaField kLastUpdatedSequenceNumber = 
SchemaField::MakeOptional(
-      kInt32Max - 108, "_last_updated_sequence_number", iceberg::int64(),
+      kLastUpdatedSequenceNumberColumnId, "_last_updated_sequence_number", 
int64(),
       "Sequence number when the row was last updated");
 
   /// \brief Get the set of metadata field IDs.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 4b2c0f47..b7f23c53 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -120,11 +120,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    avro_data_test.cc
                    avro_test.cc
                    avro_schema_test.cc
-                   avro_stream_test.cc
-                   manifest_list_versions_test.cc
-                   manifest_reader_stats_test.cc
-                   manifest_reader_test.cc
-                   manifest_writer_versions_test.cc)
+                   avro_stream_test.cc)
 
   add_iceberg_test(arrow_test
                    USE_BUNDLE
@@ -143,6 +139,15 @@ if(ICEBERG_BUILD_BUNDLE)
                    eval_expr_test.cc
                    evaluator_test.cc)
 
+  add_iceberg_test(manifest_test
+                   USE_BUNDLE
+                   SOURCES
+                   delete_file_index_test.cc
+                   manifest_list_versions_test.cc
+                   manifest_reader_stats_test.cc
+                   manifest_reader_test.cc
+                   manifest_writer_versions_test.cc)
+
   add_iceberg_test(parquet_test
                    USE_BUNDLE
                    SOURCES
diff --git a/src/iceberg/test/delete_file_index_test.cc 
b/src/iceberg/test/delete_file_index_test.cc
new file mode 100644
index 00000000..e091f5ea
--- /dev/null
+++ b/src/iceberg/test/delete_file_index_test.cc
@@ -0,0 +1,1190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/delete_file_index.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class DeleteFileIndexTest : public testing::TestWithParam<int> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+
+    file_io_ = arrow::MakeMockFileIO();
+
+    // Schema with id and data fields
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+        SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+
+    // Partitioned spec: bucket by data
+    ICEBERG_UNWRAP_OR_FAIL(
+        partitioned_spec_,
+        PartitionSpec::Make(
+            /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+                                           "data_bucket", 
Transform::Bucket(16))}));
+
+    // Unpartitioned spec
+    unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+
+    // Create sample data files
+    file_a_ = MakeDataFile("/path/to/data-a.parquet", 
PartitionValues({Literal::Int(0)}),
+                           partitioned_spec_->spec_id());
+    file_b_ = MakeDataFile("/path/to/data-b.parquet", 
PartitionValues({Literal::Int(1)}),
+                           partitioned_spec_->spec_id());
+    file_c_ = MakeDataFile("/path/to/data-c.parquet", 
PartitionValues({Literal::Int(2)}),
+                           partitioned_spec_->spec_id());
+    unpartitioned_file_ = MakeDataFile("/path/to/data-unpartitioned.parquet",
+                                       PartitionValues(std::vector<Literal>{}),
+                                       unpartitioned_spec_->spec_id());
+  }
+
+  std::string MakeManifestPath() {
+    static int counter = 0;
+    return std::format("manifest-{}-{}.avro", counter++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
+                                         const PartitionValues& partition,
+                                         int32_t spec_id, int64_t record_count 
= 1) {
+    return std::make_shared<DataFile>(DataFile{
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = record_count,
+        .file_size_in_bytes = 10,
+        .sort_order_id = 0,
+        .partition_spec_id = spec_id,
+    });
+  }
+
+  std::shared_ptr<DataFile> MakePositionDeleteFile(
+      const std::string& path, const PartitionValues& partition, int32_t 
spec_id,
+      std::optional<std::string> referenced_file = std::nullopt) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kPositionDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .partition_spec_id = spec_id,
+        .referenced_data_file = referenced_file,
+    });
+  }
+
+  std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+                                                   const PartitionValues& 
partition,
+                                                   int32_t spec_id,
+                                                   std::vector<int> 
equality_ids = {1}) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kEqualityDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .equality_ids = std::move(equality_ids),
+        .partition_spec_id = spec_id,
+    });
+  }
+
+  std::shared_ptr<DataFile> MakeDV(const std::string& path,
+                                   const PartitionValues& partition, int32_t 
spec_id,
+                                   const std::string& referenced_file,
+                                   int64_t content_offset = 4L,
+                                   int64_t content_size = 6L) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kPositionDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kPuffin,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .partition_spec_id = spec_id,
+        .referenced_data_file = referenced_file,
+        .content_offset = content_offset,
+        .content_size_in_bytes = content_size,
+    });
+  }
+
+  ManifestEntry MakeDeleteEntry(int64_t snapshot_id, int64_t sequence_number,
+                                std::shared_ptr<DataFile> file,
+                                ManifestStatus status = 
ManifestStatus::kAdded) {
+    return ManifestEntry{
+        .status = status,
+        .snapshot_id = snapshot_id,
+        .sequence_number = sequence_number,
+        .file_sequence_number = sequence_number,
+        .data_file = std::move(file),
+    };
+  }
+
+  ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+                                   std::vector<ManifestEntry> entries,
+                                   std::shared_ptr<PartitionSpec> spec) {
+    const std::string manifest_path = MakeManifestPath();
+
+    Result<std::unique_ptr<ManifestWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 2) {
+      writer_result = ManifestWriter::MakeV2Writer(
+          snapshot_id, manifest_path, file_io_, spec, schema_, 
ManifestContent::kDeletes);
+    } else if (format_version == 3) {
+      writer_result = ManifestWriter::MakeV3Writer(
+          snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, 
spec,
+          schema_, ManifestContent::kDeletes);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
+    return {{partitioned_spec_->spec_id(), partitioned_spec_},
+            {unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
+  }
+
+  Result<std::unique_ptr<DeleteFileIndex>> BuildIndex(
+      std::vector<ManifestFile> delete_manifests,
+      std::optional<int64_t> after_sequence_number = std::nullopt) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto builder, DeleteFileIndex::BuilderFor(file_io_, 
std::move(delete_manifests)));
+    builder.SpecsById(GetSpecsById()).WithSchema(schema_);
+    if (after_sequence_number.has_value()) {
+      builder.AfterSequenceNumber(after_sequence_number.value());
+    }
+    return builder.Build();
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> partitioned_spec_;
+  std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+
+  std::shared_ptr<DataFile> file_a_;
+  std::shared_ptr<DataFile> file_b_;
+  std::shared_ptr<DataFile> file_c_;
+  std::shared_ptr<DataFile> unpartitioned_file_;
+
+  // Helper to extract paths from delete files for comparison
+  static std::vector<std::string> GetPaths(
+      const std::vector<std::shared_ptr<DataFile>>& files) {
+    return std::ranges::transform_view(files,
+                                       [](const auto& f) { return 
f->file_path; }) |
+           std::ranges::to<std::vector<std::string>>();
+  }
+};
+
+TEST_P(DeleteFileIndexTest, TestEmptyIndex) {
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({}));
+
+  EXPECT_TRUE(index->empty());
+  EXPECT_FALSE(index->has_equality_deletes());
+  EXPECT_FALSE(index->has_position_deletes());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+  EXPECT_TRUE(deletes.empty());
+}
+
+TEST_P(DeleteFileIndexTest, TestMinSequenceNumberFilteringForFiles) {
+  int version = GetParam();
+
+  auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet",
+                                            
PartitionValues(std::vector<Literal>{}),
+                                            unpartitioned_spec_->spec_id());
+  auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet",
+                                            
PartitionValues(std::vector<Literal>{}),
+                                            unpartitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, 
eq_delete_1));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, 
eq_delete_2));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      unpartitioned_spec_);
+
+  // Build index with afterSequenceNumber = 4
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}, 
/*after_sequence_number=*/4));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+  EXPECT_FALSE(index->has_position_deletes());
+
+  // Only delete file with seq > 4 should be included (seq=6)
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, 
*unpartitioned_file_));
+  EXPECT_EQ(deletes.size(), 1);
+  EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete-2.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestUnpartitionedDeletes) {
+  int version = GetParam();
+
+  auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet",
+                                            
PartitionValues(std::vector<Literal>{}),
+                                            unpartitioned_spec_->spec_id());
+  auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet",
+                                            
PartitionValues(std::vector<Literal>{}),
+                                            unpartitioned_spec_->spec_id());
+  auto pos_delete_1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet",
+                                             
PartitionValues(std::vector<Literal>{}),
+                                             unpartitioned_spec_->spec_id());
+  auto pos_delete_2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet",
+                                             
PartitionValues(std::vector<Literal>{}),
+                                             unpartitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, 
eq_delete_1));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, 
eq_delete_2));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/5, 
pos_delete_1));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, 
pos_delete_2));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      unpartitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+  EXPECT_TRUE(index->has_position_deletes());
+
+  // All deletes should apply to seq 0
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, 
*unpartitioned_file_));
+    EXPECT_EQ(deletes.size(), 4);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre(
+                    "/path/to/eq-delete-1.parquet", 
"/path/to/eq-delete-2.parquet",
+                    "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // All deletes should apply to seq 3
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(3, 
*unpartitioned_file_));
+    EXPECT_EQ(deletes.size(), 4);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre(
+                    "/path/to/eq-delete-1.parquet", 
"/path/to/eq-delete-2.parquet",
+                    "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // Last 3 deletes should apply to seq 4 (eq_delete_2, pos_delete_1, 
pos_delete_2)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(4, 
*unpartitioned_file_));
+    EXPECT_EQ(deletes.size(), 3);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+                                              "/path/to/pos-delete-1.parquet",
+                                              
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // Last 3 deletes should apply to seq 5
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(5, 
*unpartitioned_file_));
+    EXPECT_EQ(deletes.size(), 3);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+                                              "/path/to/pos-delete-1.parquet",
+                                              
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // Last delete should apply to seq 6 (only pos_delete_2)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(6, 
*unpartitioned_file_));
+    EXPECT_EQ(deletes.size(), 1);
+    EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete-2.parquet");
+  }
+
+  // No deletes should apply to seq 7
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(7, 
*unpartitioned_file_));
+    EXPECT_TRUE(deletes.empty());
+  }
+
+  // Global equality deletes should apply to a partitioned file
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+    // Only equality deletes are global, position deletes are not
+    EXPECT_EQ(deletes.size(), 2);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre("/path/to/eq-delete-1.parquet",
+                                              "/path/to/eq-delete-2.parquet"));
+  }
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedDeleteIndex) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto eq_delete_1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", 
partition_a,
+                                            partitioned_spec_->spec_id());
+  auto eq_delete_2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", 
partition_a,
+                                            partitioned_spec_->spec_id());
+  auto pos_delete_1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet", 
partition_a,
+                                             partitioned_spec_->spec_id());
+  auto pos_delete_2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet", 
partition_a,
+                                             partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/4, 
eq_delete_1));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, 
eq_delete_2));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/5, 
pos_delete_1));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/6, 
pos_delete_2));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+  EXPECT_TRUE(index->has_position_deletes());
+
+  // All deletes should apply to file_a_ at seq 0
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+    EXPECT_EQ(deletes.size(), 4);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre(
+                    "/path/to/eq-delete-1.parquet", 
"/path/to/eq-delete-2.parquet",
+                    "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // All deletes should apply to file_a_ at seq 3
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(3, *file_a_));
+    EXPECT_EQ(deletes.size(), 4);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre(
+                    "/path/to/eq-delete-1.parquet", 
"/path/to/eq-delete-2.parquet",
+                    "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // Last 3 deletes should apply to file_a_ at seq 4
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(4, *file_a_));
+    EXPECT_EQ(deletes.size(), 3);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+                                              "/path/to/pos-delete-1.parquet",
+                                              
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // Last 3 deletes should apply to file_a_ at seq 5
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(5, *file_a_));
+    EXPECT_EQ(deletes.size(), 3);
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+                                              "/path/to/pos-delete-1.parquet",
+                                              
"/path/to/pos-delete-2.parquet"));
+  }
+
+  // Last delete should apply to file_a_ at seq 6
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(6, *file_a_));
+    EXPECT_EQ(deletes.size(), 1);
+    EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete-2.parquet");
+  }
+
+  // No deletes should apply to file_a_ at seq 7
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(7, *file_a_));
+    EXPECT_TRUE(deletes.empty());
+  }
+
+  // No deletes should apply to file_b_ (different partition)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_b_));
+    EXPECT_TRUE(deletes.empty());
+  }
+
+  // No deletes should apply to file_c_ (different partition)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_c_));
+    EXPECT_TRUE(deletes.empty());
+  }
+
+  // No deletes should apply to unpartitioned file (different spec)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, 
*unpartitioned_file_));
+    EXPECT_TRUE(deletes.empty());
+  }
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithPartitionPosDeletes) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", 
partition_a,
+                                           partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
pos_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_FALSE(index->has_equality_deletes());
+  EXPECT_TRUE(index->has_position_deletes());
+
+  // Position delete should apply to file_a_ at seq 1
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+  EXPECT_EQ(deletes.size(), 1);
+  EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithPartitionEqDeletes) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", 
partition_a,
+                                          partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
eq_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+  EXPECT_FALSE(index->has_position_deletes());
+
+  // Equality delete should apply to file_a_ at seq 1
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+  EXPECT_EQ(deletes.size(), 1);
+  EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithUnrelatedPartitionDeletes) 
{
+  int version = GetParam();
+
+  // Create deletes for partition A
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", 
partition_a,
+                                           partitioned_spec_->spec_id());
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", 
partition_a,
+                                          partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
pos_delete));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
eq_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // No deletes should apply to file_b_ (different partition)
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_b_));
+  EXPECT_TRUE(deletes.empty());
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableWithOlderPartitionDeletes) {
+  int version = GetParam();
+  if (version >= 3) {
+    GTEST_SKIP() << "DVs are not filtered using sequence numbers in V3+";
+  }
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", 
partition_a,
+                                           partitioned_spec_->spec_id());
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", 
partition_a,
+                                          partitioned_spec_->spec_id());
+
+  // Delete files have sequence number 1
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
eq_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // Data file with sequence number 2 should not have any deletes applied
+  // (deletes were committed before the data file)
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(2, *file_a_));
+  EXPECT_TRUE(deletes.empty());
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableScanWithGlobalDeletes) {
+  int version = GetParam();
+  if (version >= 3) {
+    GTEST_SKIP() << "Different behavior for position deletes in V3";
+  }
+
+  // Create unpartitioned equality and position deletes
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
+                                          
PartitionValues(std::vector<Literal>{}),
+                                          unpartitioned_spec_->spec_id());
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
+                                           
PartitionValues(std::vector<Literal>{}),
+                                           unpartitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
eq_delete));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
pos_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      unpartitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // Only global equality deletes should apply to partitioned file_a_
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+  EXPECT_EQ(deletes.size(), 1);
+  EXPECT_EQ(deletes[0]->file_path, "/path/to/eq-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, 
TestPartitionedTableScanWithGlobalAndPartitionDeletes) {
+  int version = GetParam();
+  if (version >= 3) {
+    GTEST_SKIP() << "Different behavior for position deletes in V3";
+  }
+
+  // Create partition-scoped equality delete
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto partition_eq_delete = MakeEqualityDeleteFile(
+      "/path/to/partition-eq-delete.parquet", partition_a, 
partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> partition_entries;
+  partition_entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
partition_eq_delete));
+
+  auto partition_manifest = WriteDeleteManifest(
+      version, /*snapshot_id=*/1000L, std::move(partition_entries), 
partitioned_spec_);
+
+  // Create unpartitioned equality and position deletes
+  auto global_eq_delete = 
MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet",
+                                                 
PartitionValues(std::vector<Literal>{}),
+                                                 
unpartitioned_spec_->spec_id());
+  auto global_pos_delete = 
MakePositionDeleteFile("/path/to/global-pos-delete.parquet",
+                                                  
PartitionValues(std::vector<Literal>{}),
+                                                  
unpartitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> global_entries;
+  global_entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/3, 
global_eq_delete));
+  global_entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/3, 
global_pos_delete));
+
+  auto global_manifest = WriteDeleteManifest(
+      version, /*snapshot_id=*/1001L, std::move(global_entries), 
unpartitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({partition_manifest, 
global_manifest}));
+
+  // Both partition-scoped and global equality deletes should apply to file_a_
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+  EXPECT_EQ(deletes.size(), 2);
+  EXPECT_THAT(GetPaths(deletes),
+              
testing::UnorderedElementsAre("/path/to/partition-eq-delete.parquet",
+                                            
"/path/to/global-eq-delete.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestPartitionedTableSequenceNumbers) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", 
partition_a,
+                                          partitioned_spec_->spec_id());
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", 
partition_a,
+                                           partitioned_spec_->spec_id());
+
+  // Both data and deletes have same sequence number (same commit)
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
eq_delete));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // Data file with sequence number 1 should only have position deletes applied
+  // (equality deletes apply to data with seq < delete seq)
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, *file_a_));
+  EXPECT_EQ(deletes.size(), 1);
+  EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestUnpartitionedTableSequenceNumbers) {
+  int version = GetParam();
+  if (version >= 3) {
+    GTEST_SKIP() << "Different behavior in V3";
+  }
+
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet",
+                                          
PartitionValues(std::vector<Literal>{}),
+                                          unpartitioned_spec_->spec_id());
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet",
+                                           
PartitionValues(std::vector<Literal>{}),
+                                           unpartitioned_spec_->spec_id());
+
+  // Both have same sequence number
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
eq_delete));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      unpartitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // Data file with sequence number 1 should only have position deletes applied
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(1, 
*unpartitioned_file_));
+  EXPECT_EQ(deletes.size(), 1);
+  EXPECT_EQ(deletes[0]->file_path, "/path/to/pos-delete.parquet");
+}
+
+TEST_P(DeleteFileIndexTest, TestPositionDeletesGroup) {
+  internal::PositionDeletes group;
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto file1 = MakePositionDeleteFile("/path/to/pos-delete-1.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+  auto file2 = MakePositionDeleteFile("/path/to/pos-delete-2.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+  auto file3 = MakePositionDeleteFile("/path/to/pos-delete-3.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+  auto file4 = MakePositionDeleteFile("/path/to/pos-delete-4.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+
+  // Add files out of order
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 4, file4)), IsOk());
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 2, file2)), IsOk());
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 1, file1)), IsOk());
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 3, file3)), IsOk());
+
+  // Group must not be empty
+  EXPECT_FALSE(group.empty());
+
+  // All files must be reported as referenced
+  auto referenced = group.ReferencedDeleteFiles();
+  EXPECT_EQ(referenced.size(), 4);
+  EXPECT_THAT(GetPaths(referenced),
+              testing::UnorderedElementsAre(
+                  "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet",
+                  "/path/to/pos-delete-3.parquet", 
"/path/to/pos-delete-4.parquet"));
+
+  // Position deletes are indexed by their data sequence numbers
+  {
+    auto filtered = group.Filter(0);
+    EXPECT_EQ(filtered.size(), 4);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre(
+                    "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet",
+                    "/path/to/pos-delete-3.parquet", 
"/path/to/pos-delete-4.parquet"));
+  }
+  {
+    auto filtered = group.Filter(1);
+    EXPECT_EQ(filtered.size(), 4);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre(
+                    "/path/to/pos-delete-1.parquet", 
"/path/to/pos-delete-2.parquet",
+                    "/path/to/pos-delete-3.parquet", 
"/path/to/pos-delete-4.parquet"));
+  }
+  {
+    auto filtered = group.Filter(2);
+    EXPECT_EQ(filtered.size(), 3);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre("/path/to/pos-delete-2.parquet",
+                                              "/path/to/pos-delete-3.parquet",
+                                              
"/path/to/pos-delete-4.parquet"));
+  }
+  {
+    auto filtered = group.Filter(3);
+    EXPECT_EQ(filtered.size(), 2);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre("/path/to/pos-delete-3.parquet",
+                                              
"/path/to/pos-delete-4.parquet"));
+  }
+  {
+    auto filtered = group.Filter(4);
+    EXPECT_EQ(filtered.size(), 1);
+    EXPECT_EQ(filtered[0]->file_path, "/path/to/pos-delete-4.parquet");
+  }
+  {
+    auto filtered = group.Filter(5);
+    EXPECT_EQ(filtered.size(), 0);
+  }
+}
+
+TEST_P(DeleteFileIndexTest, TestEqualityDeletesGroup) {
+  internal::EqualityDeletes group(*schema_);
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto file1 = MakeEqualityDeleteFile("/path/to/eq-delete-1.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+  auto file2 = MakeEqualityDeleteFile("/path/to/eq-delete-2.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+  auto file3 = MakeEqualityDeleteFile("/path/to/eq-delete-3.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+  auto file4 = MakeEqualityDeleteFile("/path/to/eq-delete-4.parquet", 
partition_a,
+                                      partitioned_spec_->spec_id());
+
+  // Add files out of order
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 4, file4)), IsOk());
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 2, file2)), IsOk());
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 1, file1)), IsOk());
+  EXPECT_THAT(group.Add(MakeDeleteEntry(1000L, 3, file3)), IsOk());
+
+  // Group must not be empty
+  EXPECT_FALSE(group.empty());
+
+  // All files must be reported as referenced
+  auto referenced = group.ReferencedDeleteFiles();
+  EXPECT_EQ(referenced.size(), 4);
+  EXPECT_THAT(GetPaths(referenced),
+              testing::UnorderedElementsAre(
+                  "/path/to/eq-delete-1.parquet", 
"/path/to/eq-delete-2.parquet",
+                  "/path/to/eq-delete-3.parquet", 
"/path/to/eq-delete-4.parquet"));
+
+  // Equality deletes are indexed by data sequence number - 1 to apply to next 
snapshots
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(0, *file_a_));
+    EXPECT_EQ(filtered.size(), 4);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre(
+                    "/path/to/eq-delete-1.parquet", 
"/path/to/eq-delete-2.parquet",
+                    "/path/to/eq-delete-3.parquet", 
"/path/to/eq-delete-4.parquet"));
+  }
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(1, *file_a_));
+    EXPECT_EQ(filtered.size(), 3);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre("/path/to/eq-delete-2.parquet",
+                                              "/path/to/eq-delete-3.parquet",
+                                              "/path/to/eq-delete-4.parquet"));
+  }
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(2, *file_a_));
+    EXPECT_EQ(filtered.size(), 2);
+    EXPECT_THAT(GetPaths(filtered),
+                testing::UnorderedElementsAre("/path/to/eq-delete-3.parquet",
+                                              "/path/to/eq-delete-4.parquet"));
+  }
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(3, *file_a_));
+    EXPECT_EQ(filtered.size(), 1);
+    EXPECT_EQ(filtered[0]->file_path, "/path/to/eq-delete-4.parquet");
+  }
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto filtered, group.Filter(4, *file_a_));
+    EXPECT_EQ(filtered.size(), 0);
+  }
+}
+
+TEST_P(DeleteFileIndexTest, TestMixDeleteFilesAndDVs) {
+  int version = GetParam();
+  if (version < 3) {
+    GTEST_SKIP() << "DVs only supported in V3+";
+  }
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto partition_b = PartitionValues({Literal::Int(1)});
+
+  // Position delete for file_a_
+  auto pos_delete_a = MakePositionDeleteFile("/path/to/pos-delete-a.parquet", 
partition_a,
+                                             partitioned_spec_->spec_id());
+  // DV for file_a_ (should take precedence)
+  auto dv_a = MakeDV("/path/to/dv-a.puffin", partition_a, 
partitioned_spec_->spec_id(),
+                     file_a_->file_path);
+  // Position deletes for file_b_ (no DV)
+  auto pos_delete_b1 = MakePositionDeleteFile("/path/to/pos-delete-b1.parquet",
+                                              partition_b, 
partitioned_spec_->spec_id());
+  auto pos_delete_b2 = MakePositionDeleteFile("/path/to/pos-delete-b2.parquet",
+                                              partition_b, 
partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete_a));
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/2, dv_a));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete_b1));
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/2, 
pos_delete_b2));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // Only DV should apply to file_a_
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+    EXPECT_EQ(deletes.size(), 1);
+    EXPECT_TRUE(deletes[0]->content_offset.has_value());  // DV has 
content_offset
+    EXPECT_EQ(deletes[0]->referenced_data_file, file_a_->file_path);
+    EXPECT_EQ(deletes[0]->file_path, "/path/to/dv-a.puffin");
+  }
+
+  // Two delete files should apply to file_b_
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_b_));
+    EXPECT_EQ(deletes.size(), 2);
+    EXPECT_FALSE(deletes[0]->content_offset.has_value());  // Not DVs
+    EXPECT_FALSE(deletes[1]->content_offset.has_value());
+    EXPECT_THAT(GetPaths(deletes),
+                testing::UnorderedElementsAre("/path/to/pos-delete-b1.parquet",
+                                              
"/path/to/pos-delete-b2.parquet"));
+  }
+}
+
+TEST_P(DeleteFileIndexTest, TestMultipleDVs) {
+  int version = GetParam();
+  if (version < 3) {
+    GTEST_SKIP() << "DVs only supported in V3+";
+  }
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+
+  auto dv1 = MakeDV("/path/to/dv1.puffin", partition_a, 
partitioned_spec_->spec_id(),
+                    file_a_->file_path);
+  auto dv2 = MakeDV("/path/to/dv2.puffin", partition_a, 
partitioned_spec_->spec_id(),
+                    file_a_->file_path);
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1, dv1));
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/2, dv2));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  auto index_result = BuildIndex({manifest});
+  EXPECT_THAT(index_result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(index_result, HasErrorMessage("Can't index multiple DVs"));
+  EXPECT_THAT(index_result, HasErrorMessage(file_a_->file_path));
+}
+
+TEST_P(DeleteFileIndexTest, TestInvalidDVSequenceNumber) {
+  int version = GetParam();
+  if (version < 3) {
+    GTEST_SKIP() << "DVs only supported in V3+";
+  }
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+
+  auto dv = MakeDV("/path/to/dv.puffin", partition_a, 
partitioned_spec_->spec_id(),
+                   file_a_->file_path);
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1, dv));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  // Querying with sequence number > DV sequence number should fail
+  auto result = index->ForDataFile(2, *file_a_);
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage(
+                          "must be greater than or equal to data file sequence 
number"));
+}
+
+TEST_P(DeleteFileIndexTest, TestReferencedDeleteFiles) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", 
partition_a,
+                                          partitioned_spec_->spec_id());
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", 
partition_a,
+                                           partitioned_spec_->spec_id());
+  auto global_eq_delete = 
MakeEqualityDeleteFile("/path/to/global-eq-delete.parquet",
+                                                 
PartitionValues(std::vector<Literal>{}),
+                                                 
unpartitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> partition_entries;
+  partition_entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
eq_delete));
+  partition_entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete));
+
+  auto partition_manifest = WriteDeleteManifest(
+      version, /*snapshot_id=*/1000L, std::move(partition_entries), 
partitioned_spec_);
+
+  std::vector<ManifestEntry> global_entries;
+  global_entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1001L, /*sequence_number=*/2, 
global_eq_delete));
+
+  auto global_manifest = WriteDeleteManifest(
+      version, /*snapshot_id=*/1001L, std::move(global_entries), 
unpartitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({partition_manifest, 
global_manifest}));
+
+  auto referenced = index->ReferencedDeleteFiles();
+  EXPECT_EQ(referenced.size(), 3);
+  EXPECT_THAT(GetPaths(referenced),
+              testing::UnorderedElementsAre("/path/to/eq-delete.parquet",
+                                            "/path/to/pos-delete.parquet",
+                                            
"/path/to/global-eq-delete.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestExistingDeleteFiles) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto eq_delete = MakeEqualityDeleteFile("/path/to/eq-delete.parquet", 
partition_a,
+                                          partitioned_spec_->spec_id());
+  auto pos_delete = MakePositionDeleteFile("/path/to/pos-delete.parquet", 
partition_a,
+                                           partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  // Use ManifestStatus::kExisting to simulate files that were merged from a 
previous
+  // manifest
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                                    eq_delete, ManifestStatus::kExisting));
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                                    pos_delete, ManifestStatus::kExisting));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+  EXPECT_TRUE(index->has_position_deletes());
+
+  // Both delete files should be correctly loaded and applied to file_a_
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+  EXPECT_EQ(deletes.size(), 2);
+  EXPECT_THAT(GetPaths(deletes),
+              testing::UnorderedElementsAre("/path/to/eq-delete.parquet",
+                                            "/path/to/pos-delete.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestDeletedStatusExcluded) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+  auto eq_delete_added = MakeEqualityDeleteFile(
+      "/path/to/eq-delete-added.parquet", partition_a, 
partitioned_spec_->spec_id());
+  auto eq_delete_deleted = MakeEqualityDeleteFile(
+      "/path/to/eq-delete-deleted.parquet", partition_a, 
partitioned_spec_->spec_id());
+  auto pos_delete_added = MakePositionDeleteFile(
+      "/path/to/pos-delete-added.parquet", partition_a, 
partitioned_spec_->spec_id());
+  auto pos_delete_deleted = MakePositionDeleteFile(
+      "/path/to/pos-delete-deleted.parquet", partition_a, 
partitioned_spec_->spec_id());
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                                    eq_delete_added, ManifestStatus::kAdded));
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                                    eq_delete_deleted, 
ManifestStatus::kDeleted));
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                                    pos_delete_added, ManifestStatus::kAdded));
+  entries.push_back(MakeDeleteEntry(/*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                                    pos_delete_deleted, 
ManifestStatus::kDeleted));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+  EXPECT_TRUE(index->has_position_deletes());
+
+  // Only the non-deleted (ADDED) delete files should be loaded
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+  EXPECT_EQ(deletes.size(), 2);
+  EXPECT_THAT(GetPaths(deletes),
+              testing::UnorderedElementsAre("/path/to/eq-delete-added.parquet",
+                                            
"/path/to/pos-delete-added.parquet"));
+}
+
+TEST_P(DeleteFileIndexTest, TestPositionDeleteDiscardMetrics) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+
+  constexpr int32_t kDeleteFilePathFieldId = 
MetadataColumns::kDeleteFilePathColumnId;
+  constexpr int32_t kPositionFieldId = MetadataColumns::kFilePositionColumnId;
+
+  // Create a position delete file with full metrics
+  auto pos_delete = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kPositionDeletes,
+      .file_path = "/path/to/pos-delete-with-metrics.parquet",
+      .file_format = FileFormatType::kParquet,
+      .partition = partition_a,
+      .record_count = 100,
+      .file_size_in_bytes = 1024,
+      // Add stats for multiple columns
+      .column_sizes = {{kDeleteFilePathFieldId, 100}, {kPositionFieldId, 200}},
+      .value_counts = {{kDeleteFilePathFieldId, 10}, {kPositionFieldId, 20}},
+      .null_value_counts = {{kDeleteFilePathFieldId, 1}, {kPositionFieldId, 
2}},
+      .nan_value_counts = {{kDeleteFilePathFieldId, 0}, {kPositionFieldId, 0}},
+      .lower_bounds = {{kDeleteFilePathFieldId, {0x01}}, {kPositionFieldId, 
{0x02}}},
+      .upper_bounds = {{kDeleteFilePathFieldId, {0xFF}}, {kPositionFieldId, 
{0xFE}}},
+      .partition_spec_id = partitioned_spec_->spec_id(),
+  });
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
pos_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_position_deletes());
+
+  // Get the delete files from the index
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+  ASSERT_EQ(deletes.size(), 1);
+
+  const auto& returned_file = deletes[0];
+  EXPECT_EQ(returned_file->file_path, 
"/path/to/pos-delete-with-metrics.parquet");
+  // record_count should be preserved
+  EXPECT_EQ(returned_file->record_count, 100);
+  // Stats maps should only contain entries for delete file path.
+  EXPECT_EQ(returned_file->column_sizes.size(), 1);
+  EXPECT_EQ(returned_file->value_counts.size(), 1);
+  EXPECT_EQ(returned_file->null_value_counts.size(), 1);
+  EXPECT_EQ(returned_file->nan_value_counts.size(), 1);
+  EXPECT_EQ(returned_file->lower_bounds.size(), 1);
+  EXPECT_EQ(returned_file->upper_bounds.size(), 1);
+  EXPECT_TRUE(returned_file->column_sizes.contains(kDeleteFilePathFieldId));
+  EXPECT_TRUE(returned_file->value_counts.contains(kDeleteFilePathFieldId));
+  
EXPECT_TRUE(returned_file->null_value_counts.contains(kDeleteFilePathFieldId));
+  
EXPECT_TRUE(returned_file->nan_value_counts.contains(kDeleteFilePathFieldId));
+  EXPECT_TRUE(returned_file->lower_bounds.contains(kDeleteFilePathFieldId));
+  EXPECT_TRUE(returned_file->upper_bounds.contains(kDeleteFilePathFieldId));
+}
+
+TEST_P(DeleteFileIndexTest, TestEqualityDeleteDiscardMetrics) {
+  int version = GetParam();
+
+  auto partition_a = PartitionValues({Literal::Int(0)});
+
+  // Create an equality delete file with full metrics
+  auto eq_delete = std::make_shared<DataFile>(DataFile{
+      .content = DataFile::Content::kEqualityDeletes,
+      .file_path = "/path/to/eq-delete-with-metrics.parquet",
+      .file_format = FileFormatType::kParquet,
+      .partition = partition_a,
+      .record_count = 50,
+      .file_size_in_bytes = 512,
+      // Add stats for multiple columns
+      .column_sizes = {{1, 100}, {2, 200}, {3, 300}},
+      .value_counts = {{1, 10}, {2, 20}, {3, 30}},
+      .null_value_counts = {{1, 1}, {2, 2}, {3, 3}},
+      .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}},
+      .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}},
+      .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}},
+      .equality_ids = {1},  // equality field IDs
+      .partition_spec_id = partitioned_spec_->spec_id(),
+  });
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(
+      MakeDeleteEntry(/*snapshot_id=*/1000L, /*sequence_number=*/1, 
eq_delete));
+
+  auto manifest = WriteDeleteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries),
+                                      partitioned_spec_);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto index, BuildIndex({manifest}));
+
+  EXPECT_TRUE(index->has_equality_deletes());
+
+  // Get the delete files from the index
+  ICEBERG_UNWRAP_OR_FAIL(auto deletes, index->ForDataFile(0, *file_a_));
+  ASSERT_EQ(deletes.size(), 1);
+
+  const auto& returned_file = deletes[0];
+  EXPECT_EQ(returned_file->file_path, 
"/path/to/eq-delete-with-metrics.parquet");
+  // record_count should be preserved
+  EXPECT_EQ(returned_file->record_count, 50);
+  // Stats maps should only contain entries for equality field IDs.
+  EXPECT_EQ(returned_file->column_sizes.size(), 1);
+  EXPECT_EQ(returned_file->value_counts.size(), 1);
+  EXPECT_EQ(returned_file->null_value_counts.size(), 1);
+  EXPECT_EQ(returned_file->nan_value_counts.size(), 1);
+  EXPECT_EQ(returned_file->lower_bounds.size(), 1);
+  EXPECT_EQ(returned_file->upper_bounds.size(), 1);
+  EXPECT_TRUE(returned_file->column_sizes.contains(1));
+  EXPECT_TRUE(returned_file->value_counts.contains(1));
+  EXPECT_TRUE(returned_file->null_value_counts.contains(1));
+  EXPECT_TRUE(returned_file->nan_value_counts.contains(1));
+  EXPECT_TRUE(returned_file->lower_bounds.contains(1));
+  EXPECT_TRUE(returned_file->upper_bounds.contains(1));
+}
+
+INSTANTIATE_TEST_SUITE_P(DeleteFileIndexVersions, DeleteFileIndexTest,
+                         testing::Values(2, 3));
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/manifest_reader_test.cc 
b/src/iceberg/test/manifest_reader_test.cc
index ab4798db..e8a1a511 100644
--- a/src/iceberg/test/manifest_reader_test.cc
+++ b/src/iceberg/test/manifest_reader_test.cc
@@ -375,6 +375,71 @@ TEST_P(TestManifestReader, TestInvalidUsage) {
   EXPECT_THAT(reader_result, HasErrorMessage("has no snapshot ID"));
 }
 
+TEST_P(TestManifestReader, TestDropStats) {
+  int version = GetParam();
+
+  // Create a data file with full metrics
+  auto file_with_stats = std::make_unique<DataFile>(DataFile{
+      .file_path = "/path/to/data-with-stats.parquet",
+      .file_format = FileFormatType::kParquet,
+      .partition = PartitionValues({Literal::Int(0)}),
+      .record_count = 100,
+      .file_size_in_bytes = 1024,
+      // Add stats for multiple columns
+      .column_sizes = {{1, 100}, {2, 200}, {3, 300}},
+      .value_counts = {{1, 10}, {2, 20}, {3, 30}},
+      .null_value_counts = {{1, 1}, {2, 2}, {3, 3}},
+      .nan_value_counts = {{1, 0}, {2, 0}, {3, 0}},
+      .lower_bounds = {{1, {0x01}}, {2, {0x02}}, {3, {0x03}}},
+      .upper_bounds = {{1, {0xFF}}, {2, {0xFE}}, {3, {0xFD}}},
+      .sort_order_id = 0,
+  });
+
+  auto entry = MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L,
+                         std::move(file_with_stats));
+
+  std::vector<ManifestEntry> entries;
+  entries.push_back(std::move(entry));
+
+  auto manifest = WriteManifest(version, /*snapshot_id=*/1000L, 
std::move(entries));
+
+  auto reader_result = ManifestReader::Make(manifest, file_io_, schema_, 
spec_);
+  ASSERT_THAT(reader_result, IsOk());
+  auto reader = std::move(reader_result.value());
+  reader->Select({"record_count"}).TryDropStats();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto read_entries, reader->Entries());
+  ASSERT_EQ(read_entries.size(), 1);
+  const auto& read_entry = read_entries[0];
+
+  // record_count should be preserved
+  EXPECT_EQ(read_entry.data_file->record_count, 100);
+
+  // Stats maps should be cleared
+  EXPECT_TRUE(read_entry.data_file->column_sizes.empty());
+  EXPECT_TRUE(read_entry.data_file->value_counts.empty());
+  EXPECT_TRUE(read_entry.data_file->null_value_counts.empty());
+  EXPECT_TRUE(read_entry.data_file->nan_value_counts.empty());
+  EXPECT_TRUE(read_entry.data_file->lower_bounds.empty());
+  EXPECT_TRUE(read_entry.data_file->upper_bounds.empty());
+}
+
+TEST(ManifestReaderStaticTest, TestShouldDropStats) {
+  EXPECT_FALSE(ManifestReader::ShouldDropStats({}));
+  
EXPECT_FALSE(ManifestReader::ShouldDropStats({std::string(Schema::kAllColumns)}));
+  EXPECT_TRUE(ManifestReader::ShouldDropStats({"file_path", "file_format", 
"partition"}));
+  EXPECT_TRUE(
+      ManifestReader::ShouldDropStats({"file_path", "file_format", 
"record_count"}));
+  EXPECT_FALSE(
+      ManifestReader::ShouldDropStats({"file_path", "file_format", 
"value_counts"}));
+  EXPECT_FALSE(
+      ManifestReader::ShouldDropStats({"file_path", "file_format", 
"lower_bounds"}));
+  EXPECT_FALSE(ManifestReader::ShouldDropStats(
+      {"file_path", "value_counts", "null_value_counts", "lower_bounds"}));
+  EXPECT_FALSE(
+      ManifestReader::ShouldDropStats({"file_path", "record_count", 
"value_counts"}));
+}
+
 INSTANTIATE_TEST_SUITE_P(ManifestReaderVersions, TestManifestReader,
                          testing::Values(1, 2, 3));
 
diff --git a/src/iceberg/util/content_file_util.cc 
b/src/iceberg/util/content_file_util.cc
new file mode 100644
index 00000000..89c87508
--- /dev/null
+++ b/src/iceberg/util/content_file_util.cc
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/util/content_file_util.h"
+
+#include <format>
+
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/metadata_columns.h"
+#include "iceberg/util/conversions.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+bool ContentFileUtil::IsDV(const DataFile& file) {
+  return file.file_format == FileFormatType::kPuffin;
+}
+
+Result<std::optional<std::string>> ContentFileUtil::ReferencedDataFile(
+    const DataFile& file) {
+  // Equality deletes don't reference a specific data file
+  if (file.content == DataFile::Content::kEqualityDeletes) {
+    return std::nullopt;
+  }
+
+  // If referenced_data_file is set, return it
+  if (file.referenced_data_file.has_value()) {
+    return file.referenced_data_file;
+  }
+
+  // Try to derive from lower/upper bounds on file_path column
+  auto lower_it = 
file.lower_bounds.find(MetadataColumns::kDeleteFilePathColumnId);
+  if (lower_it == file.lower_bounds.end() || lower_it->second.empty()) {
+    return std::nullopt;
+  }
+
+  auto upper_it = 
file.upper_bounds.find(MetadataColumns::kDeleteFilePathColumnId);
+  if (upper_it == file.upper_bounds.end() || upper_it->second.empty()) {
+    return std::nullopt;
+  }
+
+  // Check if lower and upper bounds are equal
+  if (lower_it->second == upper_it->second) {
+    // Convert the binary bound to a string
+    ICEBERG_ASSIGN_OR_RAISE(auto string_literal,
+                            Conversions::FromBytes(*string(), 
lower_it->second));
+    return std::get<std::string>(string_literal);
+  }
+
+  return std::nullopt;
+}
+
+Result<bool> ContentFileUtil::IsFileScoped(const DataFile& file) {
+  ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, ReferencedDataFile(file));
+  return referenced_data_file.has_value();
+}
+
+bool ContentFileUtil::ContainsSingleDV(std::span<const 
std::shared_ptr<DataFile>> files) {
+  return files.size() == 1 && IsDV(*files[0]);
+}
+
+std::string ContentFileUtil::DVDesc(const DataFile& file) {
+  return std::format("DV{{location={}, offset={}, length={}, 
referencedDataFile={}}}",
+                     file.file_path, file.content_offset.value_or(-1),
+                     file.content_size_in_bytes.value_or(-1),
+                     file.referenced_data_file.value_or(""));
+}
+
+void ContentFileUtil::DropAllStats(DataFile& data_file) {
+  data_file.column_sizes.clear();
+  data_file.value_counts.clear();
+  data_file.null_value_counts.clear();
+  data_file.nan_value_counts.clear();
+  data_file.lower_bounds.clear();
+  data_file.upper_bounds.clear();
+}
+
+namespace {
+
+template <typename V>
+void DropUnselectedColumnStats(std::map<int32_t, V>& map,
+                               const std::unordered_set<int32_t>& columns) {
+  for (auto it = map.begin(); it != map.end();) {
+    if (columns.find(it->first) == columns.end()) {
+      it = map.erase(it);
+    } else {
+      ++it;
+    }
+  }
+}
+
+}  // namespace
+
+void ContentFileUtil::DropUnselectedStats(
+    DataFile& data_file, const std::unordered_set<int32_t>& selected_columns) {
+  DropUnselectedColumnStats<int64_t>(data_file.column_sizes, selected_columns);
+  DropUnselectedColumnStats<int64_t>(data_file.value_counts, selected_columns);
+  DropUnselectedColumnStats<int64_t>(data_file.null_value_counts, 
selected_columns);
+  DropUnselectedColumnStats<int64_t>(data_file.nan_value_counts, 
selected_columns);
+  DropUnselectedColumnStats<std::vector<uint8_t>>(data_file.lower_bounds,
+                                                  selected_columns);
+  DropUnselectedColumnStats<std::vector<uint8_t>>(data_file.upper_bounds,
+                                                  selected_columns);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/content_file_util.h 
b/src/iceberg/util/content_file_util.h
new file mode 100644
index 00000000..e173a41a
--- /dev/null
+++ b/src/iceberg/util/content_file_util.h
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/util/content_file_util.h
+/// Utility functions for content files (data files and delete files).
+
+#include <memory>
+#include <optional>
+#include <span>
+#include <string>
+#include <unordered_set>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Utility functions for content files.
+struct ICEBERG_EXPORT ContentFileUtil {
+  /// \brief Check if a delete file is a deletion vector (DV).
+  static bool IsDV(const DataFile& file);
+
+  /// \brief Get the referenced data file path from a position delete file.
+  static Result<std::optional<std::string>> ReferencedDataFile(const DataFile& 
file);
+
+  /// \brief Check if a delete file is file-scoped.
+  static Result<bool> IsFileScoped(const DataFile& file);
+
+  /// \brief Check if a collection of delete files contains exactly one DV.
+  static bool ContainsSingleDV(std::span<const std::shared_ptr<DataFile>> 
files);
+
+  /// \brief Generate a description string for a deletion vector.
+  static std::string DVDesc(const DataFile& file);
+
+  /// \brief In-place drop stats.
+  static void DropAllStats(DataFile& data_file);
+
+  /// \brief Preserve stats based on selected columns.
+  static void DropUnselectedStats(DataFile& data_file,
+                                  const std::unordered_set<int32_t>& 
selected_columns);
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/util/meson.build b/src/iceberg/util/meson.build
index 9f327753..48477925 100644
--- a/src/iceberg/util/meson.build
+++ b/src/iceberg/util/meson.build
@@ -20,6 +20,7 @@ install_headers(
         'bucket_util.h',
         'checked_cast.h',
         'config.h',
+        'content_file_util.h',
         'conversions.h',
         'decimal.h',
         'endian.h',

Reply via email to