This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b9ce88f9 feat: implement manifest group (#455)
b9ce88f9 is described below

commit b9ce88f91c76f051340e035f0595eaeb3726e694
Author: Gang Wu <[email protected]>
AuthorDate: Wed Dec 31 14:22:53 2025 +0800

    feat: implement manifest group (#455)
---
 src/iceberg/CMakeLists.txt                 |   1 +
 src/iceberg/constants.h                    |  12 +
 src/iceberg/delete_file_index.cc           |  36 +-
 src/iceberg/delete_file_index.h            |  25 +-
 src/iceberg/manifest/manifest_adapter.cc   |  91 +++--
 src/iceberg/manifest/manifest_entry.h      |  25 +-
 src/iceberg/manifest/manifest_group.cc     | 367 ++++++++++++++++++++
 src/iceberg/manifest/manifest_group.h      | 167 +++++++++
 src/iceberg/manifest/manifest_writer.cc    |  12 +-
 src/iceberg/manifest/meson.build           |   1 +
 src/iceberg/manifest/v2_metadata.cc        |   5 +-
 src/iceberg/manifest/v3_metadata.cc        |   5 +-
 src/iceberg/meson.build                    |   1 +
 src/iceberg/table_scan.cc                  |  28 +-
 src/iceberg/table_scan.h                   |  25 +-
 src/iceberg/test/CMakeLists.txt            |   1 +
 src/iceberg/test/delete_file_index_test.cc |   6 +-
 src/iceberg/test/manifest_group_test.cc    | 521 +++++++++++++++++++++++++++++
 src/iceberg/type_fwd.h                     |   8 +
 19 files changed, 1234 insertions(+), 103 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index bf24ae32..bc7182ae 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -42,6 +42,7 @@ set(ICEBERG_SOURCES
     json_internal.cc
     manifest/manifest_adapter.cc
     manifest/manifest_entry.cc
+    manifest/manifest_group.cc
     manifest/manifest_list.cc
     manifest/manifest_reader.cc
     manifest/manifest_writer.cc
diff --git a/src/iceberg/constants.h b/src/iceberg/constants.h
index 759ae3cc..89001f09 100644
--- a/src/iceberg/constants.h
+++ b/src/iceberg/constants.h
@@ -19,6 +19,12 @@
 
 #pragma once
 
+/// \file iceberg/constants.h
+/// This file defines constants used commonly and shared across multiple
+/// source files.  It is mostly useful to add constants that are used as
+/// default values in the class definitions in the header files without
+/// including other headers just for the constant definitions.
+
 #include <cstdint>
 #include <string_view>
 
@@ -26,5 +32,11 @@ namespace iceberg {
 
 constexpr std::string_view kParquetFieldIdKey = "PARQUET:field_id";
 constexpr int64_t kInvalidSnapshotId = -1;
+/// \brief Stand-in for the current sequence number that will be assigned when 
the commit
+/// is successful. This is replaced when writing a manifest list by the 
ManifestFile
+/// adapter.
+constexpr int64_t kUnassignedSequenceNumber = -1;
+
+// TODO(gangwu): move other commonly used constants here.
 
 }  // namespace iceberg
diff --git a/src/iceberg/delete_file_index.cc b/src/iceberg/delete_file_index.cc
index 2a96e9d3..7c1a12f0 100644
--- a/src/iceberg/delete_file_index.cc
+++ b/src/iceberg/delete_file_index.cc
@@ -453,34 +453,32 @@ Result<std::shared_ptr<DataFile>> DeleteFileIndex::FindDV(
 }
 
 Result<DeleteFileIndex::Builder> DeleteFileIndex::BuilderFor(
-    std::shared_ptr<FileIO> io, std::vector<ManifestFile> delete_manifests) {
+    std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+    std::vector<ManifestFile> delete_manifests) {
   ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
-  return Builder(std::move(io), std::move(delete_manifests));
+  ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+  ICEBERG_PRECHECK(!specs_by_id.empty(), "Partition specs cannot be empty");
+  return Builder(std::move(io), std::move(schema), std::move(specs_by_id),
+                 std::move(delete_manifests));
 }
 
 // Builder implementation
 
-DeleteFileIndex::Builder::Builder(std::shared_ptr<FileIO> io,
-                                  std::vector<ManifestFile> delete_manifests)
-    : io_(std::move(io)), delete_manifests_(std::move(delete_manifests)) {}
+DeleteFileIndex::Builder::Builder(
+    std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+    std::vector<ManifestFile> delete_manifests)
+    : io_(std::move(io)),
+      schema_(std::move(schema)),
+      specs_by_id_(std::move(specs_by_id)),
+      delete_manifests_(std::move(delete_manifests)) {}
 
 DeleteFileIndex::Builder::~Builder() = default;
 DeleteFileIndex::Builder::Builder(Builder&&) noexcept = default;
 DeleteFileIndex::Builder& DeleteFileIndex::Builder::operator=(Builder&&) 
noexcept =
     default;
 
-DeleteFileIndex::Builder& DeleteFileIndex::Builder::SpecsById(
-    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id) {
-  specs_by_id_ = std::move(specs_by_id);
-  return *this;
-}
-
-DeleteFileIndex::Builder& DeleteFileIndex::Builder::WithSchema(
-    std::shared_ptr<Schema> schema) {
-  schema_ = std::move(schema);
-  return *this;
-}
-
 DeleteFileIndex::Builder& 
DeleteFileIndex::Builder::AfterSequenceNumber(int64_t seq) {
   min_sequence_number_ = seq;
   return *this;
@@ -721,10 +719,6 @@ Status DeleteFileIndex::Builder::AddEqualityDelete(
 
 Result<std::unique_ptr<DeleteFileIndex>> DeleteFileIndex::Builder::Build() {
   ICEBERG_RETURN_UNEXPECTED(CheckErrors());
-  ICEBERG_PRECHECK(io_ != nullptr, "FileIO is required to load delete files");
-  ICEBERG_PRECHECK(schema_ != nullptr, "Schema is required to load delete 
files");
-  ICEBERG_PRECHECK(!specs_by_id_.empty(),
-                   "Partition specs are required to load delete files");
 
   std::vector<ManifestEntry> entries;
   if (!delete_manifests_.empty()) {
diff --git a/src/iceberg/delete_file_index.h b/src/iceberg/delete_file_index.h
index 03cc2661..5444281a 100644
--- a/src/iceberg/delete_file_index.h
+++ b/src/iceberg/delete_file_index.h
@@ -268,10 +268,14 @@ class ICEBERG_EXPORT DeleteFileIndex {
   /// \brief Create a builder for constructing a DeleteFileIndex from manifest 
files.
   ///
   /// \param io The FileIO to use for reading manifests
+  /// \param schema Current table schema
+  /// \param specs_by_id Partition specs by their IDs
   /// \param delete_manifests The delete manifests to index
   /// \return A Builder instance
-  static Result<Builder> BuilderFor(std::shared_ptr<FileIO> io,
-                                    std::vector<ManifestFile> 
delete_manifests);
+  static Result<Builder> BuilderFor(
+      std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+      std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+      std::vector<ManifestFile> delete_manifests);
 
  private:
   friend class Builder;
@@ -318,7 +322,9 @@ class ICEBERG_EXPORT DeleteFileIndex {
 class ICEBERG_EXPORT DeleteFileIndex::Builder : public ErrorCollector {
  public:
   /// \brief Construct a builder from manifest files.
-  Builder(std::shared_ptr<FileIO> io, std::vector<ManifestFile> 
delete_manifests);
+  Builder(std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+          std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> 
specs_by_id,
+          std::vector<ManifestFile> delete_manifests);
 
   ~Builder() override;
 
@@ -327,15 +333,6 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public 
ErrorCollector {
   Builder(const Builder&) = delete;
   Builder& operator=(const Builder&) = delete;
 
-  /// \brief Set the partition specs by ID.
-  Builder& SpecsById(
-      std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id);
-
-  /// \brief Set the table schema.
-  ///
-  /// Required for filtering and expression evaluation.
-  Builder& WithSchema(std::shared_ptr<Schema> schema);
-
   /// \brief Set the minimum sequence number for delete files.
   ///
   /// Only delete files with sequence number > min_sequence_number will be 
included.
@@ -384,10 +381,10 @@ class ICEBERG_EXPORT DeleteFileIndex::Builder : public 
ErrorCollector {
       ManifestEntry&& entry);
 
   std::shared_ptr<FileIO> io_;
+  std::shared_ptr<Schema> schema_;
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
   std::vector<ManifestFile> delete_manifests_;
   int64_t min_sequence_number_ = 0;
-  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
-  std::shared_ptr<Schema> schema_;
   std::shared_ptr<Expression> data_filter_;
   std::shared_ptr<Expression> partition_filter_;
   std::shared_ptr<PartitionSet> partition_set_;
diff --git a/src/iceberg/manifest/manifest_adapter.cc 
b/src/iceberg/manifest/manifest_adapter.cc
index da75a468..cf0a0515 100644
--- a/src/iceberg/manifest/manifest_adapter.cc
+++ b/src/iceberg/manifest/manifest_adapter.cc
@@ -36,6 +36,7 @@ namespace iceberg {
 namespace {
 
 constexpr int64_t kBlockSizeInBytesV1 = 64 * 1024 * 1024L;
+constexpr int32_t kBlockSizeInBytesFieldId = 105;
 
 Status AppendField(ArrowArray* array, int64_t value) {
   ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendInt(array, value));
@@ -254,64 +255,63 @@ Status ManifestEntryAdapter::AppendDataFile(
     auto child_array = array->children[i];
 
     switch (field.field_id()) {
-      case 134:  // content (optional int32)
+      case DataFile::kContentFieldId:  // optional int32
         ICEBERG_RETURN_UNEXPECTED(
             AppendField(child_array, static_cast<int64_t>(file.content)));
         break;
-      case 100:  // file_path (required string)
+      case DataFile::kFilePathFieldId:  // required string
         ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.file_path));
         break;
-      case 101:  // file_format (required string)
+      case DataFile::kFileFormatFieldId:  // required string
         ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, 
ToString(file.file_format)));
         break;
-      case 102: {
-        // partition (required struct)
+      case DataFile::kPartitionFieldId: {  // required struct
         auto partition_type = 
internal::checked_pointer_cast<StructType>(field.type());
         ICEBERG_RETURN_UNEXPECTED(
             AppendPartitionValues(child_array, partition_type, 
file.partition));
       } break;
-      case 103:  // record_count (required int64)
+      case DataFile::kRecordCountFieldId:  // required int64
         ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, file.record_count));
         break;
-      case 104:  // file_size_in_bytes (required int64)
+      case DataFile::kFileSizeFieldId:  // required int64
         ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, 
file.file_size_in_bytes));
         break;
-      case 105:  // block_size_in_bytes (compatible in v1)
+      case kBlockSizeInBytesFieldId:  // compatible with v1
         // always 64MB for v1
         ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, 
kBlockSizeInBytesV1));
         break;
-      case 108:  // column_sizes (optional map)
+      case DataFile::kColumnSizesFieldId:  // optional map
         ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.column_sizes));
         break;
-      case 109:  // value_counts (optional map)
+      case DataFile::kValueCountsFieldId:  // optional map
         ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.value_counts));
         break;
-      case 110:  // null_value_counts (optional map)
+      case DataFile::kNullValueCountsFieldId:  // optional map
         ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, 
file.null_value_counts));
         break;
-      case 137:  // nan_value_counts (optional map)
+      case DataFile::kNanValueCountsFieldId:  // optional map
         ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, 
file.nan_value_counts));
         break;
-      case 125:  // lower_bounds (optional map)
+      case DataFile::kLowerBoundsFieldId:  // optional map
         ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.lower_bounds));
         break;
-      case 128:  // upper_bounds (optional map)
+      case DataFile::kUpperBoundsFieldId:  // optional map
         ICEBERG_RETURN_UNEXPECTED(AppendMap(child_array, file.upper_bounds));
         break;
-      case 131:  // key_metadata (optional binary)
+      case DataFile::kKeyMetadataFieldId:  // optional binary
         if (!file.key_metadata.empty()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, 
file.key_metadata));
         } else {
           
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
         }
         break;
-      case 132:  // split_offsets (optional list)
+      case DataFile::kSplitOffsetsFieldId:  // optional list
         ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.split_offsets));
         break;
-      case 135:  // equality_ids (optional list)
+      case DataFile::kEqualityIdsFieldId:  // optional list
         ICEBERG_RETURN_UNEXPECTED(AppendList(child_array, file.equality_ids));
         break;
-      case 140:  // sort_order_id (optional int32)
+      case DataFile::kSortOrderIdFieldId:  // optional int32
         if (file.sort_order_id.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
               AppendField(child_array, 
static_cast<int64_t>(file.sort_order_id.value())));
@@ -319,15 +319,14 @@ Status ManifestEntryAdapter::AppendDataFile(
           
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
         }
         break;
-      case 142:  // first_row_id (optional int64)
+      case DataFile::kFirstRowIdFieldId:  // optional int64
         if (file.first_row_id.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(child_array, 
file.first_row_id.value()));
         } else {
           
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
         }
         break;
-      case 143: {
-        // referenced_data_file (optional string)
+      case DataFile::kReferencedDataFileFieldId: {  // optional string
         ICEBERG_ASSIGN_OR_RAISE(auto referenced_data_file, 
GetReferenceDataFile(file));
         if (referenced_data_file.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
@@ -337,7 +336,7 @@ Status ManifestEntryAdapter::AppendDataFile(
         }
         break;
       }
-      case 144:  // content_offset (optional int64)
+      case DataFile::kContentOffsetFieldId:  // optional int64
         if (file.content_offset.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
               AppendField(child_array, file.content_offset.value()));
@@ -345,7 +344,7 @@ Status ManifestEntryAdapter::AppendDataFile(
           
ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(child_array, 1));
         }
         break;
-      case 145:  // content_size_in_bytes (optional int64)
+      case DataFile::kContentSizeFieldId:  // optional int64
         if (file.content_size_in_bytes.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
               AppendField(child_array, file.content_size_in_bytes.value()));
@@ -397,18 +396,18 @@ Status ManifestEntryAdapter::AppendInternal(const 
ManifestEntry& entry) {
     auto array = array_.children[i];
 
     switch (field.field_id()) {
-      case 0:  // status (required int32)
+      case ManifestEntry::kStatusFieldId:  // required int32
         ICEBERG_RETURN_UNEXPECTED(
             AppendField(array, 
static_cast<int64_t>(static_cast<int32_t>(entry.status))));
         break;
-      case 1:  // snapshot_id (optional int64)
+      case ManifestEntry::kSnapshotIdFieldId:  // optional int64
         if (entry.snapshot_id.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, 
entry.snapshot_id.value()));
         } else {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 2:  // data_file (required struct)
+      case ManifestEntry::kDataFileFieldId:  // required struct
         if (entry.data_file) {
           // Get the data file type from the field
           auto data_file_type = 
internal::checked_pointer_cast<StructType>(field.type());
@@ -418,8 +417,7 @@ Status ManifestEntryAdapter::AppendInternal(const 
ManifestEntry& entry) {
           return InvalidManifest("Missing required data_file field from 
manifest entry.");
         }
         break;
-      case 3: {
-        // sequence_number (optional int64)
+      case ManifestEntry::kSequenceNumberFieldId: {  // optional int64
         ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(entry));
         if (sequence_num.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num.value()));
@@ -428,7 +426,7 @@ Status ManifestEntryAdapter::AppendInternal(const 
ManifestEntry& entry) {
         }
         break;
       }
-      case 4:  // file_sequence_number (optional int64)
+      case ManifestEntry::kFileSequenceNumberFieldId:  // optional int64
         if (entry.file_sequence_number.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
               AppendField(array, entry.file_sequence_number.value()));
@@ -538,36 +536,34 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
     const auto& field = fields[i];
     auto array = array_.children[i];
     switch (field.field_id()) {
-      case 500:  // manifest_path
+      case ManifestFile::kManifestPathFieldId:
         ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_path));
         break;
-      case 501:  // manifest_length
+      case ManifestFile::kManifestLengthFieldId:
         ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.manifest_length));
         break;
-      case 502:  // partition_spec_id
+      case ManifestFile::kPartitionSpecIdFieldId:
         ICEBERG_RETURN_UNEXPECTED(
             AppendField(array, static_cast<int64_t>(file.partition_spec_id)));
         break;
-      case 517:  // content
+      case ManifestFile::kContentFieldId:
         ICEBERG_RETURN_UNEXPECTED(
             AppendField(array, 
static_cast<int64_t>(static_cast<int32_t>(file.content))));
         break;
-      case 515: {
-        // sequence_number
+      case ManifestFile::kSequenceNumberFieldId: {
         ICEBERG_ASSIGN_OR_RAISE(auto sequence_num, GetSequenceNumber(file));
         ICEBERG_RETURN_UNEXPECTED(AppendField(array, sequence_num));
         break;
       }
-      case 516: {
-        // min_sequence_number
+      case ManifestFile::kMinSequenceNumberFieldId: {
         ICEBERG_ASSIGN_OR_RAISE(auto min_sequence_num, 
GetMinSequenceNumber(file));
         ICEBERG_RETURN_UNEXPECTED(AppendField(array, min_sequence_num));
         break;
       }
-      case 503:  // added_snapshot_id
+      case ManifestFile::kAddedSnapshotIdFieldId:
         ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.added_snapshot_id));
         break;
-      case 504:  // added_files_count
+      case ManifestFile::kAddedFilesCountFieldId:
         if (file.added_files_count.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
               AppendField(array, 
static_cast<int64_t>(file.added_files_count.value())));
@@ -576,7 +572,7 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 505:  // existing_files_count
+      case ManifestFile::kExistingFilesCountFieldId:
         if (file.existing_files_count.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(
               array, static_cast<int64_t>(file.existing_files_count.value())));
@@ -585,7 +581,7 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 506:  // deleted_files_count
+      case ManifestFile::kDeletedFilesCountFieldId:
         if (file.deleted_files_count.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(
               AppendField(array, 
static_cast<int64_t>(file.deleted_files_count.value())));
@@ -594,7 +590,7 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 512:  // added_rows_count
+      case ManifestFile::kAddedRowsCountFieldId:
         if (file.added_rows_count.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, 
file.added_rows_count.value()));
         } else {
@@ -602,7 +598,7 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 513:  // existing_rows_count
+      case ManifestFile::kExistingRowsCountFieldId:
         if (file.existing_rows_count.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, 
file.existing_rows_count.value()));
         } else {
@@ -610,7 +606,7 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 514:  // deleted_rows_count
+      case ManifestFile::kDeletedRowsCountFieldId:
         if (file.deleted_rows_count.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, 
file.deleted_rows_count.value()));
         } else {
@@ -618,20 +614,19 @@ Status ManifestFileAdapter::AppendInternal(const 
ManifestFile& file) {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 507:  // partitions
+      case ManifestFile::kPartitionSummaryFieldId:
         ICEBERG_RETURN_UNEXPECTED(AppendPartitionSummary(
             array, internal::checked_pointer_cast<ListType>(field.type()),
             file.partitions));
         break;
-      case 519:  // key_metadata
+      case ManifestFile::kKeyMetadataFieldId:
         if (!file.key_metadata.empty()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, file.key_metadata));
         } else {
           ICEBERG_NANOARROW_RETURN_UNEXPECTED(ArrowArrayAppendNull(array, 1));
         }
         break;
-      case 520: {
-        // first_row_id
+      case ManifestFile::kFirstRowIdFieldId: {
         ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, GetFirstRowId(file));
         if (first_row_id.has_value()) {
           ICEBERG_RETURN_UNEXPECTED(AppendField(array, first_row_id.value()));
diff --git a/src/iceberg/manifest/manifest_entry.h 
b/src/iceberg/manifest/manifest_entry.h
index 5d35530b..1e4af044 100644
--- a/src/iceberg/manifest/manifest_entry.h
+++ b/src/iceberg/manifest/manifest_entry.h
@@ -308,11 +308,30 @@ struct ICEBERG_EXPORT ManifestEntry {
   /// null.
   std::optional<int64_t> snapshot_id;
   /// Field id: 3
-  /// Data sequence number of the file. Inherited when null and status is 1 
(added).
+  /// Data sequence number of the file.
+  ///
+  /// Independently of the entry status, it represents the sequence number to 
which the
+  /// file should apply. Note the data sequence number may differ from the 
sequence number
+  /// of the snapshot in which the underlying file was added. New snapshots 
can add files
+  /// that belong to older sequence numbers (e.g. compaction). The data 
sequence number
+  /// also does not change when the file is marked as deleted.
+  ///
+  /// \note It can return nullopt if the data sequence number is unknown. This 
may happen
+  /// while reading a v2 manifest that did not persist the data sequence 
number for
+  /// manifest entries with status DELETED (older Iceberg versions).
   std::optional<int64_t> sequence_number;
   /// Field id: 4
-  /// File sequence number indicating when the file was added. Inherited when 
null and
-  /// status is 1 (added).
+  /// The file sequence number.
+  ///
+  /// The file sequence number represents the sequence number of the snapshot 
in which the
+  /// underlying file was added. The file sequence number is always assigned 
at commit and
+  /// cannot be provided explicitly, unlike the data sequence number. The file 
sequence
+  /// number does not change upon assigning and must be preserved in existing 
and deleted
+  /// entries.
+  ///
+  /// \note It can return nullopt if the file sequence number is unknown. This 
may happen
+  /// while reading a v2 manifest that did not persist the file sequence 
number for
+  /// manifest entries with status EXISTING or DELETED (older Iceberg 
versions).
   std::optional<int64_t> file_sequence_number;
   /// Field id: 2
   /// File path, partition tuple, metrics, ...
diff --git a/src/iceberg/manifest/manifest_group.cc 
b/src/iceberg/manifest/manifest_group.cc
new file mode 100644
index 00000000..220b8585
--- /dev/null
+++ b/src/iceberg/manifest/manifest_group.cc
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_group.h"
+
+#include <utility>
+
+#include "iceberg/expression/evaluator.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/expression/projections.h"
+#include "iceberg/expression/residual_evaluator.h"
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/util/checked_cast.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
+    std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+    std::vector<ManifestFile> manifests) {
+  std::vector<ManifestFile> data_manifests;
+  std::vector<ManifestFile> delete_manifests;
+  for (auto& manifest : manifests) {
+    if (manifest.content == ManifestContent::kData) {
+      data_manifests.push_back(std::move(manifest));
+    } else if (manifest.content == ManifestContent::kDeletes) {
+      delete_manifests.push_back(std::move(manifest));
+    }
+  }
+
+  return ManifestGroup::Make(std::move(io), std::move(schema), 
std::move(specs_by_id),
+                             std::move(data_manifests), 
std::move(delete_manifests));
+}
+
+Result<std::unique_ptr<ManifestGroup>> ManifestGroup::Make(
+    std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+    std::vector<ManifestFile> data_manifests,
+    std::vector<ManifestFile> delete_manifests) {
+  // DeleteFileIndex::Builder validates all input parameters so we skip 
validation here
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto delete_index_builder,
+      DeleteFileIndex::BuilderFor(io, schema, specs_by_id, 
std::move(delete_manifests)));
+  return std::unique_ptr<ManifestGroup>(
+      new ManifestGroup(std::move(io), std::move(schema), 
std::move(specs_by_id),
+                        std::move(data_manifests), 
std::move(delete_index_builder)));
+}
+
+ManifestGroup::ManifestGroup(
+    std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+    std::vector<ManifestFile> data_manifests,
+    DeleteFileIndex::Builder&& delete_index_builder)
+    : io_(std::move(io)),
+      schema_(std::move(schema)),
+      specs_by_id_(std::move(specs_by_id)),
+      data_manifests_(std::move(data_manifests)),
+      delete_index_builder_(std::move(delete_index_builder)),
+      data_filter_(True::Instance()),
+      file_filter_(True::Instance()),
+      partition_filter_(True::Instance()),
+      manifest_entry_predicate_([](const ManifestEntry&) { return true; }) {}
+
+ManifestGroup::~ManifestGroup() = default;
+
+ManifestGroup::ManifestGroup(ManifestGroup&&) noexcept = default;
+ManifestGroup& ManifestGroup::operator=(ManifestGroup&&) noexcept = default;
+
+ManifestGroup& ManifestGroup::FilterData(std::shared_ptr<Expression> filter) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(data_filter_, And::Make(data_filter_, 
filter));
+  delete_index_builder_.DataFilter(std::move(filter));
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::FilterFiles(std::shared_ptr<Expression> filter) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(file_filter_,
+                                   And::Make(file_filter_, std::move(filter)));
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::FilterPartitions(std::shared_ptr<Expression> 
filter) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(partition_filter_,
+                                   And::Make(partition_filter_, filter));
+  delete_index_builder_.PartitionFilter(std::move(filter));
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::FilterManifestEntries(
+    std::function<bool(const ManifestEntry&)> predicate) {
+  manifest_entry_predicate_ = [old_predicate = 
std::move(manifest_entry_predicate_),
+                               predicate =
+                                   std::move(predicate)](const ManifestEntry& 
entry) {
+    return old_predicate(entry) && predicate(entry);
+  };
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::IgnoreDeleted() {
+  ignore_deleted_ = true;
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::IgnoreExisting() {
+  ignore_existing_ = true;
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::IgnoreResiduals() {
+  ignore_residuals_ = true;
+  delete_index_builder_.IgnoreResiduals();
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::Select(std::vector<std::string> columns) {
+  columns_ = std::move(columns);
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  delete_index_builder_.CaseSensitive(case_sensitive);
+  return *this;
+}
+
+ManifestGroup& ManifestGroup::ColumnsToKeepStats(std::unordered_set<int32_t> 
column_ids) {
+  columns_to_keep_stats_ = std::move(column_ids);
+  return *this;
+}
+
+Result<std::vector<std::shared_ptr<FileScanTask>>> ManifestGroup::PlanFiles() {
+  auto create_file_scan_tasks =
+      [this](std::vector<ManifestEntry>&& entries,
+             const TaskContext& ctx) -> 
Result<std::vector<std::shared_ptr<ScanTask>>> {
+    std::vector<std::shared_ptr<ScanTask>> tasks;
+    tasks.reserve(entries.size());
+
+    for (auto& entry : entries) {
+      if (ctx.drop_stats) {
+        ContentFileUtil::DropAllStats(*entry.data_file);
+      } else if (!ctx.columns_to_keep_stats.empty()) {
+        ContentFileUtil::DropUnselectedStats(*entry.data_file, 
ctx.columns_to_keep_stats);
+      }
+      ICEBERG_ASSIGN_OR_RAISE(auto delete_files, ctx.deletes->ForEntry(entry));
+      ICEBERG_ASSIGN_OR_RAISE(auto residual,
+                              
ctx.residuals->ResidualFor(entry.data_file->partition));
+      tasks.push_back(std::make_shared<FileScanTask>(
+          std::move(entry.data_file), std::move(delete_files), 
std::move(residual)));
+    }
+
+    return tasks;
+  };
+
+  ICEBERG_ASSIGN_OR_RAISE(auto tasks, Plan(create_file_scan_tasks));
+
+  // Convert ScanTask to FileScanTask
+  std::vector<std::shared_ptr<FileScanTask>> file_tasks;
+  file_tasks.reserve(tasks.size());
+  for (auto& task : tasks) {
+    file_tasks.push_back(internal::checked_pointer_cast<FileScanTask>(task));
+  }
+  return file_tasks;
+}
+
+Result<std::vector<std::shared_ptr<ScanTask>>> ManifestGroup::Plan(
+    const CreateTasksFunction& create_tasks) {
+  std::unordered_map<int32_t, std::shared_ptr<ResidualEvaluator>> 
residual_cache;
+  auto get_residual_evaluator = [&](int32_t spec_id) -> 
Result<ResidualEvaluator*> {
+    if (residual_cache.contains(spec_id)) {
+      return residual_cache[spec_id].get();
+    }
+
+    auto spec_iter = specs_by_id_.find(spec_id);
+    ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+                  "Cannot find partition spec for ID {}", spec_id);
+
+    const auto& spec = spec_iter->second;
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto residual_evaluator,
+        ResidualEvaluator::Make((ignore_residuals_ ? True::Instance() : 
data_filter_),
+                                *spec, *schema_, case_sensitive_));
+    residual_cache[spec_id] = std::move(residual_evaluator);
+
+    return residual_cache[spec_id].get();
+  };
+
+  ICEBERG_ASSIGN_OR_RAISE(auto delete_index, delete_index_builder_.Build());
+
+  bool drop_stats = ManifestReader::ShouldDropStats(columns_);
+  if (delete_index->has_equality_deletes()) {
+    columns_ = ManifestReader::WithStatsColumns(columns_);
+  }
+
+  std::unordered_map<int32_t, std::unique_ptr<TaskContext>> task_context_cache;
+  auto get_task_context = [&](int32_t spec_id) -> Result<TaskContext*> {
+    if (task_context_cache.contains(spec_id)) {
+      return task_context_cache[spec_id].get();
+    }
+
+    auto spec_iter = specs_by_id_.find(spec_id);
+    ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+                  "Cannot find partition spec for ID {}", spec_id);
+
+    const auto& spec = spec_iter->second;
+    ICEBERG_ASSIGN_OR_RAISE(auto residuals, get_residual_evaluator(spec_id));
+    task_context_cache[spec_id] = std::make_unique<TaskContext>(
+        TaskContext{.spec = spec,
+                    .deletes = delete_index.get(),
+                    .residuals = residuals,
+                    .drop_stats = drop_stats,
+                    .columns_to_keep_stats = columns_to_keep_stats_});
+
+    return task_context_cache[spec_id].get();
+  };
+
+  ICEBERG_ASSIGN_OR_RAISE(auto entry_groups, ReadEntries());
+
+  std::vector<std::shared_ptr<ScanTask>> all_tasks;
+  for (auto& [spec_id, entries] : entry_groups) {
+    ICEBERG_ASSIGN_OR_RAISE(auto ctx, get_task_context(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto tasks, create_tasks(std::move(entries), 
*ctx));
+    all_tasks.insert(all_tasks.end(), std::make_move_iterator(tasks.begin()),
+                     std::make_move_iterator(tasks.end()));
+  }
+
+  return all_tasks;
+}
+
+Result<std::vector<ManifestEntry>> ManifestGroup::Entries() {
+  ICEBERG_ASSIGN_OR_RAISE(auto entry_groups, ReadEntries());
+
+  std::vector<ManifestEntry> all_entries;
+  for (auto& [_, entries] : entry_groups) {
+    all_entries.insert(all_entries.end(), 
std::make_move_iterator(entries.begin()),
+                       std::make_move_iterator(entries.end()));
+  }
+
+  return all_entries;
+}
+
+Result<std::unique_ptr<ManifestReader>> ManifestGroup::MakeReader(
+    const ManifestFile& manifest) {
+  auto spec_it = specs_by_id_.find(manifest.partition_spec_id);
+  if (spec_it == specs_by_id_.end()) {
+    return InvalidArgument("Partition spec {} not found for manifest {}",
+                           manifest.partition_spec_id, manifest.manifest_path);
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                          ManifestReader::Make(manifest, io_, schema_, 
spec_it->second));
+
+  reader->FilterRows(data_filter_)
+      .FilterPartitions(partition_filter_)
+      .CaseSensitive(case_sensitive_)
+      .Select(columns_);
+
+  return reader;
+}
+
+Result<std::unordered_map<int32_t, std::vector<ManifestEntry>>>
+ManifestGroup::ReadEntries() {
+  std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>> eval_cache;
+  auto get_manifest_evaluator = [&](int32_t spec_id) -> 
Result<ManifestEvaluator*> {
+    if (eval_cache.contains(spec_id)) {
+      return eval_cache[spec_id].get();
+    }
+
+    auto spec_iter = specs_by_id_.find(spec_id);
+    ICEBERG_CHECK(spec_iter != specs_by_id_.cend(),
+                  "Cannot find partition spec for ID {}", spec_id);
+
+    const auto& spec = spec_iter->second;
+    auto projector = Projections::Inclusive(*spec, *schema_, case_sensitive_);
+    ICEBERG_ASSIGN_OR_RAISE(auto partition_filter, 
projector->Project(data_filter_));
+    ICEBERG_ASSIGN_OR_RAISE(partition_filter,
+                            And::Make(partition_filter, partition_filter_));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto manifest_evaluator,
+        ManifestEvaluator::MakePartitionFilter(std::move(partition_filter), 
spec,
+                                               *schema_, case_sensitive_));
+    eval_cache[spec_id] = std::move(manifest_evaluator);
+
+    return eval_cache[spec_id].get();
+  };
+
+  std::unique_ptr<Evaluator> data_file_evaluator;
+  if (file_filter_ && file_filter_->op() != Expression::Operation::kTrue) {
+    // TODO(gangwu): create an Evaluator on the DataFile schema with empty
+    // partition type
+  }
+
+  std::unordered_map<int32_t, std::vector<ManifestEntry>> result;
+
+  // TODO(gangwu): Parallelize reading manifests
+  for (const auto& manifest : data_manifests_) {
+    const int32_t spec_id = manifest.partition_spec_id;
+
+    ICEBERG_ASSIGN_OR_RAISE(auto manifest_evaluator, 
get_manifest_evaluator(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(bool should_match, 
manifest_evaluator->Evaluate(manifest));
+    if (!should_match) {
+      // Skip this manifest because it doesn't match partition filter
+      continue;
+    }
+
+    if (ignore_deleted_) {
+      // only scan manifests that have entries other than deletes
+      if (!manifest.has_added_files() && !manifest.has_existing_files()) {
+        continue;
+      }
+    }
+
+    if (ignore_existing_) {
+      // only scan manifests that have entries other than existing
+      if (!manifest.has_added_files() && !manifest.has_deleted_files()) {
+        continue;
+      }
+    }
+
+    // Read manifest entries
+    ICEBERG_ASSIGN_OR_RAISE(auto reader, MakeReader(manifest));
+    ICEBERG_ASSIGN_OR_RAISE(auto entries,
+                            ignore_deleted_ ? reader->LiveEntries() : 
reader->Entries());
+
+    for (auto& entry : entries) {
+      if (ignore_existing_ && entry.status == ManifestStatus::kExisting) {
+        continue;
+      }
+
+      if (data_file_evaluator != nullptr) {
+        // TODO(gangwu): implement data_file_evaluator to evaluate StructLike 
on
+        // top of entry.data_file
+      }
+
+      if (!manifest_entry_predicate_(entry)) {
+        continue;
+      }
+
+      result[spec_id].push_back(std::move(entry));
+    }
+  }
+
+  return result;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_group.h 
b/src/iceberg/manifest/manifest_group.h
new file mode 100644
index 00000000..10b55278
--- /dev/null
+++ b/src/iceberg/manifest/manifest_group.h
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_group.h
+/// Coordinates reading manifest files and producing scan tasks.
+
+#include <functional>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/delete_file_index.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
+
+namespace iceberg {
+
+/// \brief Context passed to task creation functions.
+struct ICEBERG_EXPORT TaskContext {
+ public:
+  std::shared_ptr<PartitionSpec> spec;
+  DeleteFileIndex* deletes;
+  ResidualEvaluator* residuals;
+  bool drop_stats;
+  std::unordered_set<int32_t> columns_to_keep_stats;
+};
+
+/// \brief Coordinates reading manifest files and producing scan tasks.
+class ICEBERG_EXPORT ManifestGroup : public ErrorCollector {
+ public:
+  /// \brief Construct a ManifestGroup with a list of manifests.
+  ///
+  /// \param io FileIO for reading manifest files.
+  /// \param schema Current table schema.
+  /// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
+  /// \param manifests List of manifest files to process.
+  static Result<std::unique_ptr<ManifestGroup>> Make(
+      std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+      std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_,
+      std::vector<ManifestFile> manifests);
+
+  /// \brief Construct a ManifestGroup with pre-separated manifests.
+  ///
+  /// \param io FileIO for reading manifest files.
+  /// \param schema Current table schema.
+  /// \param specs_by_id Mapping of partition spec ID to PartitionSpec.
+  /// \param data_manifests List of data manifest files.
+  /// \param delete_manifests List of delete manifest files.
+  static Result<std::unique_ptr<ManifestGroup>> Make(
+      std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+      std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id,
+      std::vector<ManifestFile> data_manifests,
+      std::vector<ManifestFile> delete_manifests);
+
+  ~ManifestGroup() override;
+
+  ManifestGroup(ManifestGroup&&) noexcept;
+  ManifestGroup& operator=(ManifestGroup&&) noexcept;
+  ManifestGroup(const ManifestGroup&) = delete;
+  ManifestGroup& operator=(const ManifestGroup&) = delete;
+
+  /// \brief Set a row-level data filter.
+  ManifestGroup& FilterData(std::shared_ptr<Expression> filter);
+
+  /// \brief Set a filter that is evaluated against each DataFile's metadata.
+  ManifestGroup& FilterFiles(std::shared_ptr<Expression> filter);
+
+  /// \brief Set a partition filter expression.
+  ManifestGroup& FilterPartitions(std::shared_ptr<Expression> filter);
+
+  /// \brief Set a custom manifest entry filter predicate.
+  ///
+  /// \param predicate A function that returns true if the entry should be 
included.
+  ManifestGroup& FilterManifestEntries(
+      std::function<bool(const ManifestEntry&)> predicate);
+
+  /// \brief Ignore deleted entries in manifests.
+  ManifestGroup& IgnoreDeleted();
+
+  /// \brief Ignore existing entries in manifests.
+  ManifestGroup& IgnoreExisting();
+
+  /// \brief Ignore residual filter computation.
+  ManifestGroup& IgnoreResiduals();
+
+  /// \brief Select specific columns from manifest entries.
+  ///
+  /// \param columns Column names to select from manifest entries.
+  ManifestGroup& Select(std::vector<std::string> columns);
+
+  /// \brief Set case sensitivity for column name matching.
+  ManifestGroup& CaseSensitive(bool case_sensitive);
+
+  /// \brief Specify columns that should retain their statistics.
+  ///
+  /// \param column_ids Field IDs of columns whose statistics should be 
preserved.
+  ManifestGroup& ColumnsToKeepStats(std::unordered_set<int32_t> column_ids);
+
+  /// \brief Plan scan tasks for all matching data files.
+  Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles();
+
+  /// \brief Get all matching manifest entries.
+  Result<std::vector<ManifestEntry>> Entries();
+
+  using CreateTasksFunction =
+      std::function<Result<std::vector<std::shared_ptr<ScanTask>>>(
+          std::vector<ManifestEntry>&&, const TaskContext&)>;
+
+  /// \brief Plan tasks using a custom task creation function.
+  ///
+  /// \param create_tasks A function that creates ScanTasks from entries and 
context.
+  /// \return A list of ScanTask objects, or error on failure.
+  Result<std::vector<std::shared_ptr<ScanTask>>> Plan(
+      const CreateTasksFunction& create_tasks);
+
+ private:
+  ManifestGroup(std::shared_ptr<FileIO> io, std::shared_ptr<Schema> schema,
+                std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> 
specs_by_id,
+                std::vector<ManifestFile> data_manifests,
+                DeleteFileIndex::Builder&& delete_index_builder);
+
+  Result<std::unordered_map<int32_t, std::vector<ManifestEntry>>> 
ReadEntries();
+
+  Result<std::unique_ptr<ManifestReader>> MakeReader(const ManifestFile& 
manifest);
+
+  std::shared_ptr<FileIO> io_;
+  std::shared_ptr<Schema> schema_;
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
+  std::vector<ManifestFile> data_manifests_;
+  DeleteFileIndex::Builder delete_index_builder_;
+  std::shared_ptr<Expression> data_filter_;
+  std::shared_ptr<Expression> file_filter_;
+  std::shared_ptr<Expression> partition_filter_;
+  std::function<bool(const ManifestEntry&)> manifest_entry_predicate_;
+  std::vector<std::string> columns_;
+  std::unordered_set<int32_t> columns_to_keep_stats_;
+  bool case_sensitive_ = true;
+  bool ignore_deleted_ = false;
+  bool ignore_existing_ = false;
+  bool ignore_residuals_ = false;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_writer.cc 
b/src/iceberg/manifest/manifest_writer.cc
index c30f9f75..649797fe 100644
--- a/src/iceberg/manifest/manifest_writer.cc
+++ b/src/iceberg/manifest/manifest_writer.cc
@@ -21,6 +21,7 @@
 
 #include <optional>
 
+#include "iceberg/constants.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
 #include "iceberg/manifest/v1_metadata_internal.h"
@@ -233,11 +234,12 @@ Result<ManifestFile> ManifestWriter::ToManifestFile() 
const {
       .manifest_length = manifest_length,
       .partition_spec_id = adapter_->partition_spec()->spec_id(),
       .content = adapter_->content(),
-      // sequence_number and min_sequence_number with kInvalidSequenceNumber 
will be
-      // replace with real sequence number in `ManifestListWriter`.
-      .sequence_number = TableMetadata::kInvalidSequenceNumber,
-      .min_sequence_number =
-          min_sequence_number_.value_or(TableMetadata::kInvalidSequenceNumber),
+      // sequence_number with kUnassignedSequenceNumber will be assigned when 
committed.
+      .sequence_number = kUnassignedSequenceNumber,
+      // if the min_sequence_number_ is missing, then no manifests with a 
sequence number
+      // have been written, so the min data sequence number is the one that 
will be
+      // assigned when this is committed. pass kUnassignedSequenceNumber to 
inherit it.
+      .min_sequence_number = 
min_sequence_number_.value_or(kUnassignedSequenceNumber),
       .added_snapshot_id = 
adapter_->snapshot_id().value_or(kInvalidSnapshotId),
       .added_files_count = add_files_count_,
       .existing_files_count = existing_files_count_,
diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build
index 00f93c59..41e685ff 100644
--- a/src/iceberg/manifest/meson.build
+++ b/src/iceberg/manifest/meson.build
@@ -18,6 +18,7 @@
 install_headers(
     [
         'manifest_entry.h',
+        'manifest_group.h',
         'manifest_list.h',
         'manifest_reader.h',
         'manifest_writer.h',
diff --git a/src/iceberg/manifest/v2_metadata.cc 
b/src/iceberg/manifest/v2_metadata.cc
index 737fa62f..e5c54fca 100644
--- a/src/iceberg/manifest/v2_metadata.cc
+++ b/src/iceberg/manifest/v2_metadata.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 
+#include "iceberg/constants.h"
 #include "iceberg/json_internal.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
@@ -157,7 +158,7 @@ Status ManifestFileAdapterV2::Append(const ManifestFile& 
file) {
 }
 
 Result<int64_t> ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& 
file) const {
-  if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+  if (file.sequence_number == kUnassignedSequenceNumber) {
     // if the sequence number is being assigned here, then the manifest must 
be created by
     // the current operation. to validate this, check that the snapshot id 
matches the
     // current commit
@@ -173,7 +174,7 @@ Result<int64_t> 
ManifestFileAdapterV2::GetSequenceNumber(const ManifestFile& fil
 
 Result<int64_t> ManifestFileAdapterV2::GetMinSequenceNumber(
     const ManifestFile& file) const {
-  if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+  if (file.min_sequence_number == kUnassignedSequenceNumber) {
     // same sanity check as above
     if (snapshot_id_ != file.added_snapshot_id) {
       return InvalidManifestList(
diff --git a/src/iceberg/manifest/v3_metadata.cc 
b/src/iceberg/manifest/v3_metadata.cc
index 87d4d77d..35a1a89d 100644
--- a/src/iceberg/manifest/v3_metadata.cc
+++ b/src/iceberg/manifest/v3_metadata.cc
@@ -20,6 +20,7 @@
 #include <memory>
 #include <optional>
 
+#include "iceberg/constants.h"
 #include "iceberg/json_internal.h"
 #include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/manifest/manifest_list.h"
@@ -205,7 +206,7 @@ Status ManifestFileAdapterV3::Append(const ManifestFile& 
file) {
 }
 
 Result<int64_t> ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& 
file) const {
-  if (file.sequence_number == TableMetadata::kInvalidSequenceNumber) {
+  if (file.sequence_number == kUnassignedSequenceNumber) {
     // if the sequence number is being assigned here, then the manifest must 
be created by
     // the current operation. to validate this, check that the snapshot id 
matches the
     // current commit
@@ -221,7 +222,7 @@ Result<int64_t> 
ManifestFileAdapterV3::GetSequenceNumber(const ManifestFile& fil
 
 Result<int64_t> ManifestFileAdapterV3::GetMinSequenceNumber(
     const ManifestFile& file) const {
-  if (file.min_sequence_number == TableMetadata::kInvalidSequenceNumber) {
+  if (file.min_sequence_number == kUnassignedSequenceNumber) {
     // same sanity check as above
     if (snapshot_id_ != file.added_snapshot_id) {
       return InvalidManifestList(
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 2b344dc1..55349d8d 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -63,6 +63,7 @@ iceberg_sources = files(
     'json_internal.cc',
     'manifest/manifest_adapter.cc',
     'manifest/manifest_entry.cc',
+    'manifest/manifest_group.cc',
     'manifest/manifest_list.cc',
     'manifest/manifest_reader.cc',
     'manifest/manifest_writer.cc',
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 6918de82..700cab1f 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -31,7 +31,6 @@
 #include "iceberg/schema_field.h"
 #include "iceberg/snapshot.h"
 #include "iceberg/table_metadata.h"
-#include "iceberg/type.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
@@ -136,12 +135,29 @@ Result<ArrowArrayStream> 
MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
 
 }  // namespace
 
-// implement FileScanTask
-FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file)
-    : data_file_(std::move(data_file)) {}
+// FileScanTask implementation
+
+FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file,
+                           std::vector<std::shared_ptr<DataFile>> delete_files,
+                           std::shared_ptr<Expression> residual_filter)
+    : data_file_(std::move(data_file)),
+      delete_files_(std::move(delete_files)),
+      residual_filter_(std::move(residual_filter)) {}
 
 const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return 
data_file_; }
 
+const std::vector<std::shared_ptr<DataFile>>& FileScanTask::delete_files() 
const {
+  return delete_files_;
+}
+
+const std::shared_ptr<Expression>& FileScanTask::residual_filter() const {
+  return residual_filter_;
+}
+
+bool FileScanTask::has_deletes() const { return !delete_files_.empty(); }
+
+bool FileScanTask::has_residual_filter() const { return residual_filter_ != 
nullptr; }
+
 int64_t FileScanTask::size_bytes() const { return 
data_file_->file_size_in_bytes; }
 
 int32_t FileScanTask::files_count() const { return 1; }
@@ -151,6 +167,10 @@ int64_t FileScanTask::estimated_row_count() const { return 
data_file_->record_co
 Result<ArrowArrayStream> FileScanTask::ToArrow(
     const std::shared_ptr<FileIO>& io, const std::shared_ptr<Schema>& 
projected_schema,
     const std::shared_ptr<Expression>& filter) const {
+  if (has_deletes()) {
+    return NotSupported("Reading data files with delete files is not yet 
supported.");
+  }
+
   const ReaderOptions options{.path = data_file_->file_path,
                               .length = data_file_->file_size_in_bytes,
                               .io = io,
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index c3b1b28f..4f2ddfde 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -46,11 +46,30 @@ class ICEBERG_EXPORT ScanTask {
 /// \brief Task representing a data file and its corresponding delete files.
 class ICEBERG_EXPORT FileScanTask : public ScanTask {
  public:
-  explicit FileScanTask(std::shared_ptr<DataFile> data_file);
+  /// \brief Construct with data file, delete files, and residual filter.
+  ///
+  /// \param data_file The data file to read.
+  /// \param delete_files Delete files that apply to this data file.
+  /// \param residual_filter Optional residual filter to apply after reading.
+  explicit FileScanTask(std::shared_ptr<DataFile> data_file,
+                        std::vector<std::shared_ptr<DataFile>> delete_files = 
{},
+                        std::shared_ptr<Expression> residual_filter = nullptr);
 
   /// \brief The data file that should be read by this scan task.
   const std::shared_ptr<DataFile>& data_file() const;
 
+  /// \brief Delete files that apply to this data file.
+  const std::vector<std::shared_ptr<DataFile>>& delete_files() const;
+
+  /// \brief Residual filter to apply after reading.
+  const std::shared_ptr<Expression>& residual_filter() const;
+
+  /// \brief Check if any deletes need to be applied.
+  bool has_deletes() const;
+
+  /// \brief Check if a residual filter needs to be applied.
+  bool has_residual_filter() const;
+
   int64_t size_bytes() const override;
   int32_t files_count() const override;
   int64_t estimated_row_count() const override;
@@ -70,6 +89,10 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
  private:
   /// \brief Data file metadata.
   std::shared_ptr<DataFile> data_file_;
+  /// \brief Delete files that apply to this data file.
+  std::vector<std::shared_ptr<DataFile>> delete_files_;
+  /// \brief Residual filter to apply after reading.
+  std::shared_ptr<Expression> residual_filter_;
 };
 
 /// \brief Scan context holding snapshot and scan-specific metadata.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 731fe0af..a32bbe4d 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -143,6 +143,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    USE_BUNDLE
                    SOURCES
                    delete_file_index_test.cc
+                   manifest_group_test.cc
                    manifest_list_versions_test.cc
                    manifest_reader_stats_test.cc
                    manifest_reader_test.cc
diff --git a/src/iceberg/test/delete_file_index_test.cc 
b/src/iceberg/test/delete_file_index_test.cc
index e091f5ea..8c87772e 100644
--- a/src/iceberg/test/delete_file_index_test.cc
+++ b/src/iceberg/test/delete_file_index_test.cc
@@ -198,9 +198,9 @@ class DeleteFileIndexTest : public 
testing::TestWithParam<int> {
   Result<std::unique_ptr<DeleteFileIndex>> BuildIndex(
       std::vector<ManifestFile> delete_manifests,
       std::optional<int64_t> after_sequence_number = std::nullopt) {
-    ICEBERG_ASSIGN_OR_RAISE(
-        auto builder, DeleteFileIndex::BuilderFor(file_io_, 
std::move(delete_manifests)));
-    builder.SpecsById(GetSpecsById()).WithSchema(schema_);
+    ICEBERG_ASSIGN_OR_RAISE(auto builder,
+                            DeleteFileIndex::BuilderFor(file_io_, schema_, 
GetSpecsById(),
+                                                        
std::move(delete_manifests)));
     if (after_sequence_number.has_value()) {
       builder.AfterSequenceNumber(after_sequence_number.value());
     }
diff --git a/src/iceberg/test/manifest_group_test.cc 
b/src/iceberg/test/manifest_group_test.cc
new file mode 100644
index 00000000..32466c22
--- /dev/null
+++ b/src/iceberg/test/manifest_group_test.cc
@@ -0,0 +1,521 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_group.h"
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+class ManifestGroupTest : public testing::TestWithParam<int> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+
+    file_io_ = arrow::MakeMockFileIO();
+
+    // Schema with id and data fields
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+        SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+
+    // Partitioned spec: bucket by data
+    ICEBERG_UNWRAP_OR_FAIL(
+        partitioned_spec_,
+        PartitionSpec::Make(
+            /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+                                           "data_bucket_16_2", 
Transform::Bucket(16))}));
+
+    // Unpartitioned spec
+    unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+  }
+
+  std::string MakeManifestPath() {
+    static int counter = 0;
+    return std::format("manifest-{}-{}.avro", counter++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
+                                         const PartitionValues& partition,
+                                         int32_t spec_id, int64_t record_count 
= 1) {
+    return std::make_shared<DataFile>(DataFile{
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = record_count,
+        .file_size_in_bytes = 10,
+        .sort_order_id = 0,
+        .partition_spec_id = spec_id,
+    });
+  }
+
+  std::shared_ptr<DataFile> MakePositionDeleteFile(
+      const std::string& path, const PartitionValues& partition, int32_t 
spec_id,
+      std::optional<std::string> referenced_file = std::nullopt) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kPositionDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .partition_spec_id = spec_id,
+        .referenced_data_file = referenced_file,
+    });
+  }
+
+  std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+                                                   const PartitionValues& 
partition,
+                                                   int32_t spec_id,
+                                                   std::vector<int> 
equality_ids = {1}) {
+    return std::make_shared<DataFile>(DataFile{
+        .content = DataFile::Content::kEqualityDeletes,
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = partition,
+        .record_count = 1,
+        .file_size_in_bytes = 10,
+        .equality_ids = std::move(equality_ids),
+        .partition_spec_id = spec_id,
+    });
+  }
+
+  ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+                          int64_t sequence_number, std::shared_ptr<DataFile> 
file) {
+    return ManifestEntry{
+        .status = status,
+        .snapshot_id = snapshot_id,
+        .sequence_number = sequence_number,
+        .file_sequence_number = sequence_number,
+        .data_file = std::move(file),
+    };
+  }
+
+  ManifestFile WriteDataManifest(int format_version, int64_t snapshot_id,
+                                 std::vector<ManifestEntry> entries,
+                                 std::shared_ptr<PartitionSpec> spec) {
+    const std::string manifest_path = MakeManifestPath();
+
+    Result<std::unique_ptr<ManifestWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 1) {
+      writer_result = ManifestWriter::MakeV1Writer(snapshot_id, manifest_path, 
file_io_,
+                                                   spec, schema_);
+    } else if (format_version == 2) {
+      writer_result = ManifestWriter::MakeV2Writer(snapshot_id, manifest_path, 
file_io_,
+                                                   spec, schema_, 
ManifestContent::kData);
+    } else if (format_version == 3) {
+      writer_result =
+          ManifestWriter::MakeV3Writer(snapshot_id, /*first_row_id=*/0L, 
manifest_path,
+                                       file_io_, spec, schema_, 
ManifestContent::kData);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  ManifestFile WriteDeleteManifest(int format_version, int64_t snapshot_id,
+                                   std::vector<ManifestEntry> entries,
+                                   std::shared_ptr<PartitionSpec> spec) {
+    const std::string manifest_path = MakeManifestPath();
+
+    Result<std::unique_ptr<ManifestWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 2) {
+      writer_result = ManifestWriter::MakeV2Writer(
+          snapshot_id, manifest_path, file_io_, spec, schema_, 
ManifestContent::kDeletes);
+    } else if (format_version == 3) {
+      writer_result = ManifestWriter::MakeV3Writer(
+          snapshot_id, /*first_row_id=*/std::nullopt, manifest_path, file_io_, 
spec,
+          schema_, ManifestContent::kDeletes);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
+    return {{partitioned_spec_->spec_id(), partitioned_spec_},
+            {unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
+  }
+
+  std::string MakeManifestListPath() {
+    static int counter = 0;
+    return std::format("manifest-list-{}-{}.avro", counter++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  // Write a ManifestFile to a manifest list and read it back. This is useful 
for v1
+  // to populate all missing fields like sequence_number.
+  ManifestFile WriteAndReadManifestListEntry(int format_version, int64_t 
snapshot_id,
+                                             int64_t sequence_number,
+                                             const ManifestFile& manifest) {
+    const std::string manifest_list_path = MakeManifestListPath();
+    constexpr int64_t kParentSnapshotId = 0L;
+    constexpr int64_t kSnapshotFirstRowId = 0L;
+
+    Result<std::unique_ptr<ManifestListWriter>> writer_result =
+        NotSupported("Format version: {}", format_version);
+
+    if (format_version == 1) {
+      writer_result = ManifestListWriter::MakeV1Writer(snapshot_id, 
kParentSnapshotId,
+                                                       manifest_list_path, 
file_io_);
+    } else if (format_version == 2) {
+      writer_result = ManifestListWriter::MakeV2Writer(
+          snapshot_id, kParentSnapshotId, sequence_number, manifest_list_path, 
file_io_);
+    } else if (format_version == 3) {
+      writer_result = ManifestListWriter::MakeV3Writer(
+          snapshot_id, kParentSnapshotId, sequence_number, kSnapshotFirstRowId,
+          manifest_list_path, file_io_);
+    }
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+    EXPECT_THAT(writer->Add(manifest), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    auto reader_result = ManifestListReader::Make(manifest_list_path, 
file_io_);
+    EXPECT_THAT(reader_result, IsOk());
+    auto reader = std::move(reader_result.value());
+    auto files_result = reader->Files();
+    EXPECT_THAT(files_result, IsOk());
+
+    auto manifests = files_result.value();
+    EXPECT_EQ(manifests.size(), 1);
+    return manifests[0];
+  }
+
+  static std::vector<std::string> GetPaths(
+      const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
+    return tasks | std::views::transform([](const auto& task) {
+             return task->data_file()->file_path;
+           }) |
+           std::ranges::to<std::vector<std::string>>();
+  }
+
+  static std::vector<std::string> GetEntryPaths(
+      const std::vector<ManifestEntry>& entries) {
+    return entries | std::views::transform([](const auto& entry) {
+             return entry.data_file->file_path;
+           }) |
+           std::ranges::to<std::vector<std::string>>();
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> partitioned_spec_;
+  std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+};
+
+TEST_P(ManifestGroupTest, CreateAndGetEntries) {
+  int version = GetParam();
+  if (version < 2) {
+    GTEST_SKIP() << "Delete files only supported in V2+";
+  }
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  // Create data manifests
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data1.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data2.parquet", part_value,
+                             partitioned_spec_->spec_id()))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  // Create delete manifest
+  std::vector<ManifestEntry> delete_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId,
+                /*sequence_number=*/2,
+                MakePositionDeleteFile("/path/to/delete.parquet", part_value,
+                                       partitioned_spec_->spec_id()))};
+  auto delete_manifest = WriteDeleteManifest(
+      version, kSnapshotId, std::move(delete_entries), partitioned_spec_);
+
+  // Create ManifestGroup with pre-separated manifests
+  std::vector<ManifestFile> data_manifests = {data_manifest};
+  std::vector<ManifestFile> delete_manifests = {delete_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(data_manifests),
+                          std::move(delete_manifests)));
+
+  // Verify Entries() returns only data file entries
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+  ASSERT_EQ(entries.size(), 2);
+  EXPECT_THAT(
+      GetEntryPaths(entries),
+      testing::UnorderedElementsAre("/path/to/data1.parquet", 
"/path/to/data2.parquet"));
+
+  // Verify PlanFiles returns data files with associated delete files
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                             
"/path/to/data2.parquet"));
+  EXPECT_EQ(tasks[0]->delete_files().size(), 1);
+  EXPECT_EQ(tasks[1]->delete_files()[0]->file_path, "/path/to/delete.parquet");
+  EXPECT_EQ(tasks[1]->delete_files().size(), 1);
+  EXPECT_EQ(tasks[1]->delete_files()[0]->file_path, "/path/to/delete.parquet");
+}
+
+TEST_P(ManifestGroupTest, IgnoreDeleted) {
+  int version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  constexpr int64_t kSequenceNumber = 1L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  // Create data manifest with ADDED and DELETED entries
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber,
+                MakeDataFile("/path/to/added.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kDeleted, kSnapshotId, kSequenceNumber,
+                MakeDataFile("/path/to/deleted.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kExisting, kSnapshotId, kSequenceNumber,
+                MakeDataFile("/path/to/existing.parquet", part_value,
+                             partitioned_spec_->spec_id()))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+  auto read_back_manifest =
+      WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, 
data_manifest);
+
+  // Create ManifestGroup with IgnoreDeleted
+  std::vector<ManifestFile> manifests = {read_back_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->IgnoreDeleted();
+
+  // Plan files - should only return ADDED and EXISTING
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks),
+              testing::UnorderedElementsAre("/path/to/added.parquet",
+                                            "/path/to/existing.parquet"));
+}
+
+TEST_P(ManifestGroupTest, IgnoreExisting) {
+  int version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  constexpr int64_t kSequenceNumber = 1L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  // Create data manifest with ADDED and EXISTING entries
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, kSequenceNumber,
+                MakeDataFile("/path/to/added.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kExisting, kSnapshotId, kSequenceNumber,
+                MakeDataFile("/path/to/existing.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kDeleted, kSnapshotId, kSequenceNumber,
+                MakeDataFile("/path/to/deleted.parquet", part_value,
+                             partitioned_spec_->spec_id()))};
+
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+  auto read_back_manifest =
+      WriteAndReadManifestListEntry(version, kSnapshotId, kSequenceNumber, 
data_manifest);
+
+  // Create ManifestGroup with IgnoreExisting
+  std::vector<ManifestFile> manifests = {read_back_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->IgnoreExisting();
+
+  // Plan files - should only return ADDED
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/added.parquet",
+                                                             
"/path/to/deleted.parquet"));
+}
+
+TEST_P(ManifestGroupTest, CustomManifestEntriesFilter) {
+  int version = GetParam();
+
+  constexpr int64_t kSnapshotId = 1000L;
+  const auto part_value = PartitionValues({Literal::Int(0)});
+
+  // Create data manifest with multiple files
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data1.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data2.parquet", part_value,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kAdded, kSnapshotId, /*sequence_number=*/1,
+                MakeDataFile("/path/to/data3.parquet", part_value,
+                             partitioned_spec_->spec_id()))};
+  auto data_manifest =
+      WriteDataManifest(version, kSnapshotId, std::move(data_entries), 
partitioned_spec_);
+
+  // Create ManifestGroup with custom entry filter
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+  group->FilterManifestEntries([](const ManifestEntry& entry) {
+    // Only include files with "data1" or "data3" in the path
+    return entry.data_file->file_path.find("data1") != std::string::npos ||
+           entry.data_file->file_path.find("data3") != std::string::npos;
+  });
+
+  // Plan files - should only return filtered entries
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                             
"/path/to/data3.parquet"));
+}
+
+TEST_P(ManifestGroupTest, EmptyManifestGroup) {
+  std::vector<ManifestFile> manifests;
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  EXPECT_TRUE(tasks.empty());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, group->Entries());
+  EXPECT_TRUE(entries.empty());
+}
+
+TEST_P(ManifestGroupTest, MultipleDataManifests) {
+  int version = GetParam();
+
+  const auto partition_a = PartitionValues({Literal::Int(0)});
+  const auto partition_b = PartitionValues({Literal::Int(1)});
+
+  // Create first data manifest
+  std::vector<ManifestEntry> data_entries_1{MakeEntry(
+      ManifestStatus::kAdded, /*snapshot_id=*/1000L, /*sequence_number=*/1,
+      MakeDataFile("/path/to/data1.parquet", partition_a, 
partitioned_spec_->spec_id()))};
+  auto data_manifest_1 = WriteDataManifest(version, /*snapshot_id=*/1000L,
+                                           std::move(data_entries_1), 
partitioned_spec_);
+
+  // Create second data manifest
+  std::vector<ManifestEntry> data_entries_2{MakeEntry(
+      ManifestStatus::kAdded, /*snapshot_id=*/1001L, /*sequence_number=*/2,
+      MakeDataFile("/path/to/data2.parquet", partition_b, 
partitioned_spec_->spec_id()))};
+  auto data_manifest_2 = WriteDataManifest(version, /*snapshot_id=*/1001L,
+                                           std::move(data_entries_2), 
partitioned_spec_);
+
+  // Create ManifestGroup with multiple manifests
+  std::vector<ManifestFile> manifests = {data_manifest_1, data_manifest_2};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+
+  // Plan files - should return files from both manifests
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  ASSERT_EQ(tasks.size(), 2);
+  EXPECT_THAT(GetPaths(tasks), 
testing::UnorderedElementsAre("/path/to/data1.parquet",
+                                                             
"/path/to/data2.parquet"));
+}
+
+TEST_P(ManifestGroupTest, PartitionFilter) {
+  int version = GetParam();
+
+  // Create two files with different partition values (bucket 0 and bucket 1)
+  const auto partition_bucket_0 = PartitionValues({Literal::Int(0)});
+  const auto partition_bucket_1 = PartitionValues({Literal::Int(1)});
+
+  // Create data manifest with two entries in different partitions
+  std::vector<ManifestEntry> data_entries{
+      MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                MakeDataFile("/path/to/bucket0.parquet", partition_bucket_0,
+                             partitioned_spec_->spec_id())),
+      MakeEntry(ManifestStatus::kAdded, /*snapshot_id=*/1000L, 
/*sequence_number=*/1,
+                MakeDataFile("/path/to/bucket1.parquet", partition_bucket_1,
+                             partitioned_spec_->spec_id()))};
+  auto data_manifest = WriteDataManifest(version, /*snapshot_id=*/1000L,
+                                         std::move(data_entries), 
partitioned_spec_);
+
+  // Create ManifestGroup with partition filter for bucket 0
+  std::vector<ManifestFile> manifests = {data_manifest};
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto group,
+      ManifestGroup::Make(file_io_, schema_, GetSpecsById(), 
std::move(manifests)));
+
+  // Filter on partition field name (data_bucket_16_2 = 0)
+  auto partition_filter = Expressions::Equal("data_bucket_16_2", 
Literal::Int(0));
+  group->FilterPartitions(std::move(partition_filter));
+
+  // Plan files - should only return the file in bucket 0
+  ICEBERG_UNWRAP_OR_FAIL(auto tasks, group->PlanFiles());
+  ASSERT_EQ(tasks.size(), 1);
+  EXPECT_THAT(GetPaths(tasks), 
testing::ElementsAre("/path/to/bucket0.parquet"));
+}
+
+INSTANTIATE_TEST_SUITE_P(ManifestGroupVersions, ManifestGroupTest,
+                         testing::Values(1, 2, 3));
+
+}  // namespace iceberg
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 6c7e6b77..65afeb87 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -130,6 +130,13 @@ class Literal;
 class Term;
 class UnboundPredicate;
 
+/// \brief Evaluator.
+class Evaluator;
+class InclusiveMetricsEvaluator;
+class ManifestEvaluator;
+class ResidualEvaluator;
+class StrictMetricsEvaluator;
+
 /// \brief Scan.
 class DataTableScan;
 class FileScanTask;
@@ -144,6 +151,7 @@ struct ManifestEntry;
 struct ManifestFile;
 struct ManifestList;
 struct PartitionFieldSummary;
+class ManifestGroup;
 class ManifestListReader;
 class ManifestListWriter;
 class ManifestReader;

Reply via email to