This is an automated email from the ASF dual-hosted git repository.

wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new bf262180 feat(manifest): add ManifestFilterManager and 
ManifestMergeManager (#652)
bf262180 is described below

commit bf26218098778356b9f8d703478ddd2235d7cc68
Author: Guotao Yu <[email protected]>
AuthorDate: Wed May 20 14:19:01 2026 +0800

    feat(manifest): add ManifestFilterManager and ManifestMergeManager (#652)
    
    Implement two manifest management classes for table write operations:
    
    - ManifestFilterManager: filters manifest entries by row filter
    expression, file path, or partition value; supports
    FailMissingDeletePaths validation. Rewrites manifests that contain
    matching files, marking entries as DELETED; passes through manifests
    that cannot contain matching files unchanged.
    
    - ManifestMergeManager: merges small manifests using greedy bin-packing,
    grouping by partition_spec_id (manifests with different specs are never
    merged). Oversized manifests pass through unchanged. ADDED entries from
    prior manifests become EXISTING when merged (matching Java semantics).
---
 src/iceberg/CMakeLists.txt                       |   2 +
 src/iceberg/manifest/manifest_filter_manager.cc  | 423 +++++++++++++++++++++++
 src/iceberg/manifest/manifest_filter_manager.h   | 229 ++++++++++++
 src/iceberg/manifest/manifest_merge_manager.cc   | 191 ++++++++++
 src/iceberg/manifest/manifest_merge_manager.h    | 114 ++++++
 src/iceberg/manifest/manifest_writer.h           |   5 +
 src/iceberg/manifest/meson.build                 |   2 +
 src/iceberg/meson.build                          |   2 +
 src/iceberg/schema.cc                            |   2 +
 src/iceberg/schema.h                             |   2 +
 src/iceberg/test/CMakeLists.txt                  |   2 +
 src/iceberg/test/manifest_filter_manager_test.cc | 382 ++++++++++++++++++++
 src/iceberg/test/manifest_merge_manager_test.cc  | 355 +++++++++++++++++++
 13 files changed, 1711 insertions(+)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 79298b1a..68cacebe 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -45,8 +45,10 @@ set(ICEBERG_SOURCES
     location_provider.cc
     manifest/manifest_adapter.cc
     manifest/manifest_entry.cc
+    manifest/manifest_filter_manager.cc
     manifest/manifest_group.cc
     manifest/manifest_list.cc
+    manifest/manifest_merge_manager.cc
     manifest/manifest_reader.cc
     manifest/manifest_util.cc
     manifest/manifest_writer.cc
diff --git a/src/iceberg/manifest/manifest_filter_manager.cc 
b/src/iceberg/manifest/manifest_filter_manager.cc
new file mode 100644
index 00000000..086c94a7
--- /dev/null
+++ b/src/iceberg/manifest/manifest_filter_manager.cc
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/expression/residual_evaluator.h"
+#include "iceberg/expression/strict_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+using PartitionSpecsById = ManifestFilterManager::PartitionSpecsById;
+
+bool HasRowFilterExpression(const std::shared_ptr<Expression>& expr) {
+  return expr != nullptr && expr->op() != Expression::Operation::kFalse;
+}
+
+Result<std::shared_ptr<PartitionSpec>> PartitionSpecById(
+    const PartitionSpecsById& specs_by_id, int32_t spec_id) {
+  auto iter = specs_by_id.find(spec_id);
+  if (iter == specs_by_id.end() || iter->second == nullptr) {
+    return NotFound("Partition spec with ID {} is not found", spec_id);
+  }
+  return iter->second;
+}
+
+Result<std::string> FormatPartitionPath(const PartitionSpecsById& specs_by_id,
+                                        const DataFile& file, int32_t spec_id) 
{
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id));
+  return spec->PartitionPath(file.partition);
+}
+
+}  // namespace
+
+ManifestFilterManager::ManifestFilterManager(ManifestContent content,
+                                             std::shared_ptr<FileIO> file_io)
+    : manifest_content_(content),
+      file_io_(std::move(file_io)),
+      delete_expr_(Expressions::AlwaysFalse()) {}
+
+ManifestFilterManager::~ManifestFilterManager() = default;
+
+Status ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression> 
expr) {
+  ICEBERG_PRECHECK(expr != nullptr, "Cannot delete files using filter: null");
+  ICEBERG_ASSIGN_OR_RAISE(delete_expr_, Or::MakeFolded(delete_expr_, 
std::move(expr)));
+  manifest_evaluator_cache_.clear();
+  residual_evaluator_cache_.clear();
+  return {};
+}
+
+void ManifestFilterManager::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  manifest_evaluator_cache_.clear();
+  residual_evaluator_cache_.clear();
+}
+
+void ManifestFilterManager::DeleteFile(std::string_view path) {
+  delete_paths_.insert(std::string(path));
+}
+
+Status ManifestFilterManager::DeleteFile(std::shared_ptr<DataFile> file) {
+  ICEBERG_PRECHECK(file != nullptr, "Cannot delete file: null");
+  delete_paths_.insert(file->file_path);
+  delete_files_.insert(std::move(file));
+  return {};
+}
+
+const DataFileSet& ManifestFilterManager::FilesToBeDeleted() const {
+  return delete_files_;
+}
+
+void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  drop_partitions_.add(spec_id, std::move(partition));
+}
+
+void ManifestFilterManager::FailMissingDeletePaths() {
+  fail_missing_delete_paths_ = true;
+}
+
+void ManifestFilterManager::FailAnyDelete() { fail_any_delete_ = true; }
+
+bool ManifestFilterManager::ContainsDeletes() const {
+  return HasRowFilterExpression(delete_expr_) || !delete_paths_.empty() ||
+         !drop_partitions_.empty();
+}
+
+Result<bool> ManifestFilterManager::CanContainDroppedFiles(const 
ManifestFile&) const {
+  // TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete
+  // manifests once object-delete partitions are tracked separately.
+  // Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based 
delete,
+  // which forces scanning all manifests.
+  return !delete_paths_.empty();
+}
+
+Result<bool> ManifestFilterManager::CanContainDroppedPartitions(
+    const ManifestFile& manifest) const {
+  if (drop_partitions_.empty()) return false;
+  // TODO(Guotao): Use partition_summaries bounds to skip manifests that cannot
+  // contain any dropped partition, instead of only matching partition spec 
IDs.
+  // Only manifests whose partition spec matches a registered drop can contain
+  // entries for that partition.  PartitionKey is pair<spec_id, values>.
+  int32_t spec_id = manifest.partition_spec_id;
+  for (const auto& key : drop_partitions_) {
+    if (key.first == spec_id) return true;
+  }
+  return false;
+}
+
+Result<bool> ManifestFilterManager::CanContainExpressionDeletes(
+    const ManifestFile& manifest, const std::shared_ptr<Schema>& schema,
+    const PartitionSpecsById& specs_by_id) {
+  if (!HasRowFilterExpression(delete_expr_)) return false;
+  int32_t spec_id = manifest.partition_spec_id;
+  ICEBERG_ASSIGN_OR_RAISE(auto* evaluator,
+                          GetManifestEvaluator(schema, specs_by_id, spec_id));
+  return evaluator->Evaluate(manifest);
+}
+
+Result<bool> ManifestFilterManager::CanContainDeletedFiles(
+    const ManifestFile& manifest, const std::shared_ptr<Schema>& schema,
+    const PartitionSpecsById& specs_by_id, bool trust_manifest_references) {
+  // A manifest with no live files cannot contain files to delete.
+  // Missing counts mean the count is unknown; treat it as possibly non-zero.
+  bool has_live = !manifest.added_files_count.has_value() ||
+                  manifest.added_files_count.value() > 0 ||
+                  !manifest.existing_files_count.has_value() ||
+                  manifest.existing_files_count.value() > 0;
+  if (!has_live) return false;
+
+  if (trust_manifest_references) {
+    // TODO(Guotao): Return whether this manifest is in the referenced 
manifest set.
+    return true;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto can_contain_dropped_files,
+                          CanContainDroppedFiles(manifest));
+  if (can_contain_dropped_files) return true;
+
+  ICEBERG_ASSIGN_OR_RAISE(auto can_contain_expression_deletes,
+                          CanContainExpressionDeletes(manifest, schema, 
specs_by_id));
+  if (can_contain_expression_deletes) return true;
+
+  return CanContainDroppedPartitions(manifest);
+}
+
+Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
+    const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+    int32_t spec_id) {
+  auto& evaluator = manifest_evaluator_cache_[spec_id];
+  if (!evaluator) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, 
spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(evaluator, ManifestEvaluator::MakeRowFilter(
+                                           delete_expr_, spec, *schema, 
case_sensitive_));
+  }
+  return evaluator.get();
+}
+
+Result<ResidualEvaluator*> ManifestFilterManager::GetResidualEvaluator(
+    const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+    int32_t spec_id) {
+  auto& evaluator = residual_evaluator_cache_[spec_id];
+  if (!evaluator) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, 
spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(evaluator, ResidualEvaluator::Make(delete_expr_, 
*spec,
+                                                               *schema, 
case_sensitive_));
+  }
+  return evaluator.get();
+}
+
+Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
+                                                 const 
std::shared_ptr<Schema>& schema,
+                                                 const PartitionSpecsById& 
specs_by_id,
+                                                 int32_t manifest_spec_id) {
+  if (!entry.data_file) return false;
+  const DataFile& file = *entry.data_file;
+  int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
+
+  // Path-based and partition-drop checks
+  if (delete_paths_.count(file.file_path) ||
+      drop_partitions_.contains(spec_id, file.partition)) {
+    if (fail_any_delete_) {
+      ICEBERG_ASSIGN_OR_RAISE(auto partition_path,
+                              FormatPartitionPath(specs_by_id, file, spec_id));
+      return InvalidArgument("Operation would delete existing data: {}", 
partition_path);
+    }
+    return true;
+  }
+
+  if (HasRowFilterExpression(delete_expr_)) {
+    ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval,
+                            GetResidualEvaluator(schema, specs_by_id, 
spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto residual_expr,
+                            residual_eval->ResidualFor(file.partition));
+    // TODO(Guotao): Cache strict/inclusive metrics evaluators per partition 
residual.
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto strict_eval,
+        StrictMetricsEvaluator::Make(residual_expr, schema, case_sensitive_));
+    ICEBERG_ASSIGN_OR_RAISE(auto strict_match, strict_eval->Evaluate(file));
+    if (strict_match) {
+      if (fail_any_delete_) {
+        ICEBERG_ASSIGN_OR_RAISE(auto partition_path,
+                                FormatPartitionPath(specs_by_id, file, 
spec_id));
+        return InvalidArgument("Operation would delete existing data: {}",
+                               partition_path);
+      }
+      return true;
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto incl_eval, InclusiveMetricsEvaluator::Make(
+                                                residual_expr, *schema, 
case_sensitive_));
+    ICEBERG_ASSIGN_OR_RAISE(auto incl_match, incl_eval->Evaluate(file));
+    if (incl_match) {
+      if (manifest_content_ == ManifestContent::kDeletes) {
+        return false;
+      }
+      return InvalidArgument(
+          "Cannot delete file where some, but not all, rows match filter: {}",
+          file.file_path);
+    }
+  }
+
+  return false;
+}
+
+bool ManifestFilterManager::CanTrustManifestReferences(
+    const std::vector<const ManifestFile*>&) const {
+  // TODO(Guotao): Track source manifest locations for object deletes so 
manifests
+  // outside the referenced set can be skipped before any other delete checks.
+  return false;
+}
+
+Result<ManifestFile> ManifestFilterManager::FilterManifest(
+    const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+    const ManifestFile& manifest, bool trust_manifest_references,
+    const ManifestWriterFactory& writer_factory,
+    std::unordered_set<std::string>& found_paths) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto can_contain_deleted_files,
+      CanContainDeletedFiles(manifest, schema, specs_by_id, 
trust_manifest_references));
+  if (!can_contain_deleted_files) {
+    return manifest;
+  }
+
+  int32_t spec_id = manifest.partition_spec_id;
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id));
+  ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                          ManifestReader::Make(manifest, file_io_, schema, 
spec));
+  ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
+
+  ICEBERG_ASSIGN_OR_RAISE(auto has_deleted_files,
+                          ManifestHasDeletedFiles(entries, schema, 
specs_by_id, spec_id));
+  if (!has_deleted_files) {
+    return manifest;
+  }
+
+  return FilterManifestWithDeletedFiles(entries, spec_id, schema, specs_by_id,
+                                        writer_factory, found_paths);
+}
+
+Result<bool> ManifestFilterManager::ManifestHasDeletedFiles(
+    const std::vector<ManifestEntry>& entries, const std::shared_ptr<Schema>& 
schema,
+    const PartitionSpecsById& specs_by_id, int32_t manifest_spec_id) {
+  for (const auto& entry : entries) {
+    ICEBERG_ASSIGN_OR_RAISE(auto should_delete,
+                            ShouldDelete(entry, schema, specs_by_id, 
manifest_spec_id));
+    if (should_delete) {
+      return true;
+    }
+  }
+  return false;
+}
+
+Result<ManifestFile> ManifestFilterManager::FilterManifestWithDeletedFiles(
+    const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
+    const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+    const ManifestWriterFactory& writer_factory,
+    std::unordered_set<std::string>& found_paths) {
+  ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                          writer_factory(manifest_spec_id, manifest_content_));
+  for (const auto& entry : entries) {
+    ICEBERG_ASSIGN_OR_RAISE(auto should_delete,
+                            ShouldDelete(entry, schema, specs_by_id, 
manifest_spec_id));
+    if (should_delete) {
+      if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) {
+        found_paths.insert(entry.data_file->file_path);
+      }
+      if (entry.data_file) {
+        // TODO(Guotao): Track duplicate deletes and avoid full DataFile 
copies when
+        // summary generation can use lighter records.
+        delete_files_.insert(std::make_shared<DataFile>(*entry.data_file));
+      }
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
+    } else {
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
+    }
+  }
+
+  ICEBERG_RETURN_UNEXPECTED(writer->Close());
+  return writer->ToManifestFile();
+}
+
+Status ManifestFilterManager::ValidateRequiredDeletes(
+    const std::unordered_set<std::string>& found_paths) const {
+  if (!fail_missing_delete_paths_) {
+    return {};
+  }
+
+  std::string missing;
+  for (const auto& path : delete_paths_) {
+    if (!found_paths.count(path)) {
+      if (!missing.empty()) missing += ", ";
+      missing += path;
+    }
+  }
+  if (!missing.empty()) {
+    return InvalidArgument("Missing delete paths: {}", missing);
+  }
+  return {};
+}
+
+Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
+    const TableMetadata& metadata, const std::shared_ptr<Snapshot>& 
base_snapshot,
+    const ManifestWriterFactory& writer_factory) {
+  if (!base_snapshot) {
+    ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes({}));
+    return std::vector<ManifestFile>{};
+  }
+
+  ICEBERG_PRECHECK(file_io_ != nullptr, "Cannot filter manifests: FileIO is 
null");
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto list_reader, ManifestListReader::Make(base_snapshot->manifest_list, 
file_io_));
+  ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files());
+
+  std::vector<const ManifestFile*> manifests;
+  manifests.reserve(all_manifests.size());
+  for (auto& manifest : all_manifests) {
+    manifests.push_back(&manifest);
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  TableMetadataCache metadata_cache(&metadata);
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  return FilterManifests(schema, specs_by_id.get(), manifests, writer_factory);
+}
+
+Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
+    const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+    const std::vector<const ManifestFile*>& input_manifests,
+    const ManifestWriterFactory& writer_factory) {
+  ICEBERG_PRECHECK(schema != nullptr, "Cannot filter manifests: schema is 
null");
+  ICEBERG_PRECHECK(file_io_ != nullptr, "Cannot filter manifests: FileIO is 
null");
+
+  std::vector<const ManifestFile*> manifests;
+  manifests.reserve(input_manifests.size());
+  for (const auto* manifest : input_manifests) {
+    ICEBERG_PRECHECK(manifest != nullptr, "Cannot filter manifests: manifest 
is null");
+    if (manifest->content == manifest_content_) {
+      manifests.push_back(manifest);
+    }
+  }
+
+  std::unordered_set<std::string> found_paths;
+  if (manifests.empty()) {
+    ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
+    return std::vector<ManifestFile>{};
+  }
+
+  bool trust_manifest_references = CanTrustManifestReferences(manifests);
+  manifest_evaluator_cache_.clear();
+  residual_evaluator_cache_.clear();
+
+  // TODO(Guotao): Parallelize manifest filtering with per-manifest results, 
then
+  // merge found paths and deleted files after the loop.
+  std::vector<ManifestFile> filtered;
+  filtered.reserve(manifests.size());
+  for (const auto* manifest_ptr : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto filtered_manifest,
+        FilterManifest(schema, specs_by_id, *manifest_ptr, 
trust_manifest_references,
+                       writer_factory, found_paths));
+    filtered.push_back(std::move(filtered_manifest));
+  }
+
+  ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
+  return filtered;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_filter_manager.h 
b/src/iceberg/manifest/manifest_filter_manager.h
new file mode 100644
index 00000000..55258b2b
--- /dev/null
+++ b/src/iceberg/manifest/manifest_filter_manager.h
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_filter_manager.h
+/// Filters an existing snapshot's manifest list, marking data files as DELETED
+/// or EXISTING based on row-filter expressions, exact path deletes, and 
partition drops.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/data_file_set.h"
+#include "iceberg/util/partition_value_util.h"
+
+namespace iceberg {
+
+/// \brief Filters an existing snapshot's manifest list.
+///
+/// The manager accumulates delete conditions incrementally, then applies them 
all
+/// at once in a single FilterManifests() call.  Manifests that contain no 
deleted
+/// entries are returned unchanged (no I/O).  Manifests that do contain deleted
+/// entries are rewritten with those entries marked DELETED.
+///
+/// The manager is content-agnostic: pass ManifestContent::kData to process 
data
+/// manifests, or ManifestContent::kDeletes to process delete manifests.
+///
+/// TODO(Guotao): For ManifestContent::kDeletes, implement cleanup for orphan 
delete files
+/// and dangling deletion vectors.
+///
+/// \note This class is non-copyable and non-movable.
+class ICEBERG_EXPORT ManifestFilterManager {
+ public:
+  using PartitionSpecsById = std::unordered_map<int32_t, 
std::shared_ptr<PartitionSpec>>;
+
+  ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO> 
file_io);
+  ~ManifestFilterManager();
+
+  ManifestFilterManager(const ManifestFilterManager&) = delete;
+  ManifestFilterManager& operator=(const ManifestFilterManager&) = delete;
+
+  /// \brief Register a row-filter expression.
+  ///
+  /// Any manifest entry whose column metrics indicate the file may satisfy the
+  /// expression will be marked DELETED.
+  ///
+  /// \param expr The expression to match files against
+  Status DeleteByRowFilter(std::shared_ptr<Expression> expr);
+
+  /// \brief Set whether row-filter field binding is case-sensitive.
+  void CaseSensitive(bool case_sensitive);
+
+  /// \brief Register an exact file path for deletion.
+  ///
+  /// Any manifest entry whose file_path matches this path will be marked 
DELETED.
+  ///
+  /// \param path The exact file path to delete
+  void DeleteFile(std::string_view path);
+
+  /// \brief Register a file object for deletion.
+  ///
+  /// Any manifest entry whose file_path matches file->file_path will be marked
+  /// DELETED.  The file object is retained in FilesToBeDeleted(), allowing 
callers
+  /// to enumerate deleted file objects for follow-up delete-file cleanup.
+  /// Duplicate registrations (same path) are silently ignored.
+  ///
+  /// \param file The data/delete file to delete (must not be null)
+  Status DeleteFile(std::shared_ptr<DataFile> file);
+
+  /// \brief Returns the set of file objects marked for deletion by this 
manager.
+  ///
+  /// This includes files registered via DeleteFile(DataFile) and files 
discovered
+  /// during FilterManifests() that were deleted by path, partition, or 
row-filter
+  /// matching.  Used by higher-level operations (e.g. RowDelta) to enumerate 
the
+  /// deleted data files for delete-file cleanup.
+  const DataFileSet& FilesToBeDeleted() const;
+
+  /// \brief Register a partition for dropping.
+  ///
+  /// Any manifest entry whose (spec_id, partition) pair matches will be 
marked DELETED.
+  ///
+  /// \param spec_id The partition spec ID
+  /// \param partition The partition values to drop
+  void DropPartition(int32_t spec_id, PartitionValues partition);
+
+  /// \brief Set a flag that makes FilterManifests() fail if any registered
+  /// delete path was not found in any manifest entry.
+  void FailMissingDeletePaths();
+
+  /// \brief Set a flag that makes FilterManifests() return an error if any
+  /// manifest entry matches a delete condition.
+  void FailAnyDelete();
+
+  /// \brief Returns true if any delete condition has been registered.
+  bool ContainsDeletes() const;
+
+  /// \brief Apply all accumulated delete conditions to the base snapshot's 
manifests.
+  ///
+  /// Manifests that cannot possibly contain deleted files are returned 
unchanged.
+  /// Manifests that do contain deleted files are rewritten using 
writer_factory.
+  ///
+  /// \param metadata Table metadata (provides specs and schema for evaluators)
+  /// \param base_snapshot The snapshot whose manifests to filter (may be null)
+  /// \param writer_factory Factory to create new ManifestWriter instances
+  /// \return The filtered manifest list, or an error
+  Result<std::vector<ManifestFile>> FilterManifests(
+      const TableMetadata& metadata, const std::shared_ptr<Snapshot>& 
base_snapshot,
+      const ManifestWriterFactory& writer_factory);
+
+  /// \brief Apply all accumulated delete conditions to the provided manifests.
+  ///
+  /// This overload accepts only the context needed for filtering.  It is 
intended for
+  /// callers that already have the active schema, partition specs, and 
manifest list.
+  ///
+  /// \param schema Active schema to bind row-filter expressions and metrics 
evaluators
+  /// \param specs_by_id All partition specs keyed by spec ID
+  /// \param manifests Manifest descriptors to filter
+  /// \param writer_factory Factory to create new ManifestWriter instances
+  /// \return The filtered manifest list, or an error
+  Result<std::vector<ManifestFile>> FilterManifests(
+      const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+      const std::vector<const ManifestFile*>& manifests,
+      const ManifestWriterFactory& writer_factory);
+
+ private:
+  /// \brief Returns true if the manifest might contain files matching any 
expression.
+  Result<bool> CanContainExpressionDeletes(const ManifestFile& manifest,
+                                           const std::shared_ptr<Schema>& 
schema,
+                                           const PartitionSpecsById& 
specs_by_id);
+
+  /// \brief Returns true if the manifest might contain files in a dropped 
partition.
+  ///
+  /// Checks whether the manifest's partition_spec_id matches any spec_id 
registered
+  /// via DropPartition().  Manifests from a different spec cannot contain the 
dropped
+  /// partition values.
+  Result<bool> CanContainDroppedPartitions(const ManifestFile& manifest) const;
+
+  /// \brief Returns true if the manifest might contain path-deleted files.
+  Result<bool> CanContainDroppedFiles(const ManifestFile& manifest) const;
+
+  /// \brief Returns true if the manifest possibly contains any deleted file.
+  Result<bool> CanContainDeletedFiles(const ManifestFile& manifest,
+                                      const std::shared_ptr<Schema>& schema,
+                                      const PartitionSpecsById& specs_by_id,
+                                      bool trust_manifest_references);
+
+  bool CanTrustManifestReferences(
+      const std::vector<const ManifestFile*>& manifests) const;
+
+  Result<ManifestFile> FilterManifest(const std::shared_ptr<Schema>& schema,
+                                      const PartitionSpecsById& specs_by_id,
+                                      const ManifestFile& manifest,
+                                      bool trust_manifest_references,
+                                      const ManifestWriterFactory& 
writer_factory,
+                                      std::unordered_set<std::string>& 
found_paths);
+
+  Result<bool> ManifestHasDeletedFiles(const std::vector<ManifestEntry>& 
entries,
+                                       const std::shared_ptr<Schema>& schema,
+                                       const PartitionSpecsById& specs_by_id,
+                                       int32_t manifest_spec_id);
+
+  Result<ManifestFile> FilterManifestWithDeletedFiles(
+      const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
+      const std::shared_ptr<Schema>& schema, const PartitionSpecsById& 
specs_by_id,
+      const ManifestWriterFactory& writer_factory,
+      std::unordered_set<std::string>& found_paths);
+
+  Status ValidateRequiredDeletes(
+      const std::unordered_set<std::string>& found_paths) const;
+
+  /// \brief Get or create a ManifestEvaluator for the given spec.
+  Result<ManifestEvaluator*> GetManifestEvaluator(const 
std::shared_ptr<Schema>& schema,
+                                                  const PartitionSpecsById& 
specs_by_id,
+                                                  int32_t spec_id);
+
+  /// \brief Get or create a ResidualEvaluator for the given spec.
+  Result<ResidualEvaluator*> GetResidualEvaluator(const 
std::shared_ptr<Schema>& schema,
+                                                  const PartitionSpecsById& 
specs_by_id,
+                                                  int32_t spec_id);
+
+  /// \brief Check whether a single entry should be deleted.
+  Result<bool> ShouldDelete(const ManifestEntry& entry,
+                            const std::shared_ptr<Schema>& schema,
+                            const PartitionSpecsById& specs_by_id,
+                            int32_t manifest_spec_id);
+
+  const ManifestContent manifest_content_;
+  std::shared_ptr<FileIO> file_io_;
+
+  std::shared_ptr<Expression> delete_expr_;
+  std::unordered_set<std::string> delete_paths_;
+  DataFileSet delete_files_;
+  PartitionSet drop_partitions_;
+  bool fail_missing_delete_paths_{false};
+  bool fail_any_delete_{false};
+  bool case_sensitive_{true};
+
+  std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
+      manifest_evaluator_cache_;
+  std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>
+      residual_evaluator_cache_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_merge_manager.cc 
b/src/iceberg/manifest/manifest_merge_manager.cc
new file mode 100644
index 00000000..056dce3f
--- /dev/null
+++ b/src/iceberg/manifest/manifest_merge_manager.cc
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <algorithm>
+#include <array>
+#include <iterator>
+#include <map>
+#include <ranges>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes,
+                                           int32_t min_count_to_merge, bool 
merge_enabled)
+    : target_size_bytes_(target_size_bytes),
+      min_count_to_merge_(min_count_to_merge),
+      merge_enabled_(merge_enabled) {}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
+    const std::vector<ManifestFile>& existing_manifests,
+    const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+    const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // Combine new then existing (new-first ordering is preserved in output).
+  auto to_manifest_ptr = [](const ManifestFile& manifest) { return &manifest; 
};
+  auto manifest_ranges = std::array{
+      new_manifests | std::views::transform(to_manifest_ptr),
+      existing_manifests | std::views::transform(to_manifest_ptr),
+  };
+
+  std::vector<const ManifestFile*> all;
+  all.reserve(new_manifests.size() + existing_manifests.size());
+  std::ranges::copy(manifest_ranges | std::views::join, 
std::back_inserter(all));
+
+  if (all.empty() || !merge_enabled_) {
+    return all |
+           std::views::transform([](const ManifestFile* manifest) { return 
*manifest; }) |
+           std::ranges::to<std::vector<ManifestFile>>();
+  }
+
+  // Track the first (newest) manifest independently per content type.
+  std::map<ManifestContent, const ManifestFile*> first_by_content;
+  std::ranges::for_each(all, [&first_by_content](const ManifestFile* manifest) 
{
+    first_by_content.try_emplace(manifest->content, manifest);
+  });
+
+  // Group manifests by (partition_spec_id, content), never merging across 
specs or
+  // content types. Reverse spec ordering preserves v3 first-row-id assignment 
order.
+  using GroupKey = std::pair<int32_t, ManifestContent>;
+  auto group_key = [](const ManifestFile* manifest) {
+    return GroupKey{manifest->partition_spec_id, manifest->content};
+  };
+
+  std::map<GroupKey, std::vector<const ManifestFile*>, std::greater<>> by_spec;
+  std::ranges::for_each(all, [&by_spec, &group_key](const ManifestFile* 
manifest) {
+    by_spec[group_key(manifest)].push_back(manifest);
+  });
+
+  std::vector<ManifestFile> result;
+  result.reserve(all.size());
+  for (auto& [key, group] : by_spec) {
+    const auto* first = first_by_content.at(key.second);
+    ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id, 
metadata,
+                                                    file_io, writer_factory));
+    std::ranges::move(merged, std::back_inserter(result));
+  }
+  return result;
+}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
+    const std::vector<const ManifestFile*>& group, const ManifestFile* first,
+    int64_t snapshot_id, const TableMetadata& metadata, 
std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // Match packEnd(group, ManifestFile::length) with lookback 1:
+  //   1. Process manifests in reverse order (oldest-first).
+  //   2. Greedy forward-pack with lookback=1: emit the current bin when the 
next item
+  //      doesn't fit, then start a new bin.
+  //   3. Reverse each bin (restoring original item order within a bin).
+  //   4. Reverse the bin list (newest manifest's bin ends up first).
+  // Effect: the newest manifest is in the first, possibly under-filled, bin.
+  std::vector<std::vector<const ManifestFile*>> bins;
+  std::vector<const ManifestFile*> current_bin;
+  int64_t bin_size = 0;
+
+  for (const auto* manifest : std::views::reverse(group)) {
+    if (!current_bin.empty() &&
+        bin_size + manifest->manifest_length > target_size_bytes_) {
+      bins.push_back(std::move(current_bin));
+      current_bin.clear();
+      bin_size = 0;
+    }
+    current_bin.push_back(manifest);
+    bin_size += manifest->manifest_length;
+  }
+  if (!current_bin.empty()) {
+    bins.push_back(std::move(current_bin));
+  }
+
+  for (auto& bin : bins) {
+    std::ranges::reverse(bin);
+  }
+  std::ranges::reverse(bins);
+
+  // Process each bin: if the bin contains the newest manifest and is too 
small,
+  // pass its contents through unchanged.
+  std::vector<ManifestFile> result;
+  result.reserve(group.size());
+  // TODO(Guotao): Flush independent bins in parallel and cache successful 
merged bins
+  // for commit retries.
+  for (auto& bin : bins) {
+    bool contains_first = std::ranges::find(bin, first) != bin.end();
+    if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) {
+      for (const auto* manifest : bin) {
+        result.push_back(*manifest);
+      }
+    } else {
+      ICEBERG_ASSIGN_OR_RAISE(
+          auto merged, FlushBin(bin, snapshot_id, metadata, file_io, 
writer_factory));
+      result.push_back(std::move(merged));
+    }
+  }
+
+  return result;
+}
+
+Result<ManifestFile> ManifestMergeManager::FlushBin(
+    const std::vector<const ManifestFile*>& bin, int64_t snapshot_id,
+    const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+    const ManifestWriterFactory& writer_factory) {
+  // A single-manifest bin requires no merging.
+  if (bin.size() == 1) return *bin[0];
+
+  const ManifestFile& first = *bin[0];
+  int32_t spec_id = first.partition_spec_id;
+
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+
+  ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, first.content));
+
+  for (const auto* manifest : bin) {
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(*manifest, file_io, schema, 
spec));
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      bool is_current =
+          entry.snapshot_id.has_value() && entry.snapshot_id.value() == 
snapshot_id;
+      if (entry.status == ManifestStatus::kDeleted) {
+        // Carry forward only the current snapshot's deletes; drop older 
tombstones.
+        if (is_current) {
+          ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
+        }
+      } else if (entry.status == ManifestStatus::kAdded && is_current) {
+        // Files added by the current snapshot retain their ADDED status.
+        ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+      } else {
+        // Files added by prior snapshots (ADDED or EXISTING) become EXISTING.
+        ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
+      }
+    }
+  }
+
+  ICEBERG_RETURN_UNEXPECTED(writer->Close());
+  return writer->ToManifestFile();
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_merge_manager.h 
b/src/iceberg/manifest/manifest_merge_manager.h
new file mode 100644
index 00000000..16cc8d98
--- /dev/null
+++ b/src/iceberg/manifest/manifest_merge_manager.h
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_merge_manager.h
+/// Merges small manifests into fewer larger ones according to table 
properties.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Merges small manifests into larger ones using greedy bin-packing.
+///
+/// Manifests are grouped by partition_spec_id before merging; manifests with
+/// different spec IDs are never merged together.  Within a group, manifests 
are
+/// accumulated into bins until a bin would exceed target_size_bytes, at which
+/// point the bin is flushed (written) and a new one started.  Manifests 
already
+/// larger than target_size_bytes pass through unchanged.
+///
+/// \note This class is non-copyable and non-movable.
+class ICEBERG_EXPORT ManifestMergeManager {
+ public:
+  /// \brief Construct a merge manager with the given configuration.
+  ///
+  /// \param target_size_bytes Target output manifest size in bytes
+  /// \param min_count_to_merge Minimum number of manifests before any merging 
occurs
+  /// \param merge_enabled Whether merging is enabled at all
+  ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge,
+                       bool merge_enabled);
+
+  ManifestMergeManager(const ManifestMergeManager&) = delete;
+  ManifestMergeManager& operator=(const ManifestMergeManager&) = delete;
+
+  /// \brief Merge existing and new manifests according to configured 
thresholds.
+  ///
+  /// Manifests are grouped by (partition_spec_id, content) — data and delete 
manifests
+  /// are never merged together.  Within each group, a greedy bin-packing 
algorithm
+  /// combines manifests up to target_size_bytes.  The bin that contains the 
newest
+  /// manifest for that content type is protected by min_count_to_merge: if it 
has fewer
+  /// than that many items it is passed through unchanged.
+  ///
+  /// \note Retry and rollback cleanup are handled by the caller that owns 
created
+  /// manifest paths.
+  /// TODO(Guotao): Add explicit replaced-manifest tracking here if callers 
need direct
+  /// access.
+  ///
+  /// \param existing_manifests Manifests already in the base snapshot
+  /// \param new_manifests Newly written manifests to incorporate
+  /// \param snapshot_id The ID of the snapshot being committed.  Used to 
preserve
+  ///   ADDED/DELETED status for entries written by this snapshot and to 
suppress
+  ///   stale DELETED tombstones from prior snapshots.
+  /// \param metadata Table metadata (provides specs and schema for readers)
+  /// \param file_io File IO used to open existing manifests for reading
+  /// \param writer_factory Factory to create new ManifestWriter instances
+  /// \return The merged manifest list, or an error
+  Result<std::vector<ManifestFile>> MergeManifests(
+      const std::vector<ManifestFile>& existing_manifests,
+      const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+      const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+      const ManifestWriterFactory& writer_factory);
+
+ private:
+  /// \brief Merge a group of manifests sharing the same spec_id.
+  ///
+  /// \param first The overall first (newest) manifest across all groups, used 
to
+  ///   apply the min_count_to_merge threshold on the bin that contains it.
+  Result<std::vector<ManifestFile>> MergeGroup(
+      const std::vector<const ManifestFile*>& group, const ManifestFile* first,
+      int64_t snapshot_id, const TableMetadata& metadata, 
std::shared_ptr<FileIO> file_io,
+      const ManifestWriterFactory& writer_factory);
+
+  /// \brief Write a merged manifest from all manifests in a bin.
+  ///
+  /// Entries are written snapshot-aware:
+  /// - ADDED from snapshot_id  → WriteAddedEntry (preserve status)
+  /// - DELETED from snapshot_id → WriteDeletedEntry (preserve tombstone)
+  /// - DELETED from older snapshots → dropped (stale tombstones are not 
carried forward)
+  /// - All other entries          → WriteExistingEntry
+  Result<ManifestFile> FlushBin(const std::vector<const ManifestFile*>& bin,
+                                int64_t snapshot_id, const TableMetadata& 
metadata,
+                                std::shared_ptr<FileIO> file_io,
+                                const ManifestWriterFactory& writer_factory);
+
+  const int64_t target_size_bytes_;
+  const int32_t min_count_to_merge_;
+  const bool merge_enabled_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_writer.h 
b/src/iceberg/manifest/manifest_writer.h
index cc57f25f..0eaf478d 100644
--- a/src/iceberg/manifest/manifest_writer.h
+++ b/src/iceberg/manifest/manifest_writer.h
@@ -22,6 +22,7 @@
 /// \file iceberg/manifest/manifest_writer.h
 /// Data writer interface for manifest files and manifest list files.
 
+#include <functional>
 #include <memory>
 #include <string>
 #include <vector>
@@ -163,6 +164,10 @@ class ICEBERG_EXPORT ManifestWriter {
   std::unique_ptr<PartitionSummary> partition_summary_;
 };
 
+/// \brief Factory type for creating ManifestWriter instances.
+using ManifestWriterFactory = 
std::function<Result<std::unique_ptr<ManifestWriter>>(
+    int32_t spec_id, ManifestContent content)>;
+
 /// \brief Write manifest files to a manifest list file.
 class ICEBERG_EXPORT ManifestListWriter {
  public:
diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build
index 41e685ff..d4b039a6 100644
--- a/src/iceberg/manifest/meson.build
+++ b/src/iceberg/manifest/meson.build
@@ -18,8 +18,10 @@
 install_headers(
     [
         'manifest_entry.h',
+        'manifest_filter_manager.h',
         'manifest_group.h',
         'manifest_list.h',
+        'manifest_merge_manager.h',
         'manifest_reader.h',
         'manifest_writer.h',
         'rolling_manifest_writer.h',
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 0b5f269d..03dc2447 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -67,8 +67,10 @@ iceberg_sources = files(
     'location_provider.cc',
     'manifest/manifest_adapter.cc',
     'manifest/manifest_entry.cc',
+    'manifest/manifest_filter_manager.cc',
     'manifest/manifest_group.cc',
     'manifest/manifest_list.cc',
+    'manifest/manifest_merge_manager.cc',
     'manifest/manifest_reader.cc',
     'manifest/manifest_util.cc',
     'manifest/manifest_writer.cc',
diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc
index 00905378..5fdd4799 100644
--- a/src/iceberg/schema.cc
+++ b/src/iceberg/schema.cc
@@ -40,6 +40,8 @@ Schema::Schema(std::vector<SchemaField> fields, int32_t 
schema_id)
       schema_id_(schema_id),
       cache_(std::make_unique<SchemaCache>(this)) {}
 
+Schema::~Schema() = default;
+
 Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
                                              int32_t schema_id,
                                              std::vector<int32_t> 
identifier_field_ids) {
diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h
index 3c84bc2a..791ed5c8 100644
--- a/src/iceberg/schema.h
+++ b/src/iceberg/schema.h
@@ -57,6 +57,8 @@ class ICEBERG_EXPORT Schema : public StructType {
 
   explicit Schema(std::vector<SchemaField> fields, int32_t schema_id = 
kInitialSchemaId);
 
+  ~Schema() override;
+
   /// \brief Create a schema.
   ///
   /// \param fields The fields that make up the schema.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index afafc4c1..791ad9be 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -180,6 +180,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    delete_file_index_test.cc
                    manifest_group_test.cc
                    manifest_list_versions_test.cc
+                   manifest_merge_manager_test.cc
                    manifest_reader_stats_test.cc
                    manifest_reader_test.cc
                    manifest_writer_versions_test.cc
@@ -205,6 +206,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    SOURCES
                    expire_snapshots_test.cc
                    fast_append_test.cc
+                   manifest_filter_manager_test.cc
                    name_mapping_update_test.cc
                    snapshot_manager_test.cc
                    transaction_test.cc
diff --git a/src/iceberg/test/manifest_filter_manager_test.cc 
b/src/iceberg/test/manifest_filter_manager_test.cc
new file mode 100644
index 00000000..7810509f
--- /dev/null
+++ b/src/iceberg/test/manifest_filter_manager_test.cc
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+class ManifestFilterManagerTest : public MinimalUpdateTestBase {
+ protected:
+  static void SetUpTestSuite() { avro::RegisterAll(); }
+
+  void SetUp() override {
+    MinimalUpdateTestBase::SetUp();
+
+    ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
+    ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
+
+    // Two files in different partitions (identity(x))
+    file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L);
+    file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L);
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path, int64_t 
partition_x) {
+    auto f = std::make_shared<DataFile>();
+    f->content = DataFile::Content::kData;
+    f->file_path = table_location_ + path;
+    f->file_format = FileFormatType::kParquet;
+    f->partition = 
PartitionValues(std::vector<Literal>{Literal::Long(partition_x)});
+    f->file_size_in_bytes = 1024;
+    f->record_count = 100;
+    f->partition_spec_id = spec_->spec_id();
+    return f;
+  }
+
+  // Append files, commit, refresh, and return the current snapshot.
+  Result<std::shared_ptr<Snapshot>> CommitFiles(
+      std::vector<std::shared_ptr<DataFile>> files) {
+    ICEBERG_ASSIGN_OR_RAISE(auto fa, table_->NewFastAppend());
+    for (const auto& f : files) fa->AppendFile(f);
+    ICEBERG_RETURN_UNEXPECTED(fa->Commit());
+    ICEBERG_RETURN_UNEXPECTED(table_->Refresh());
+    return table_->current_snapshot();
+  }
+
+  ManifestWriterFactory MakeWriterFactory(const TableMetadata& metadata) {
+    auto fv = metadata.format_version;
+    return [this, fv, &metadata](int32_t spec_id, ManifestContent content) 
mutable
+               -> Result<std::unique_ptr<ManifestWriter>> {
+      ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+      ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+      auto path =
+          std::format("{}/metadata/flt-{}.avro", table_location_, 
manifest_counter_++);
+      return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_, 
spec, schema,
+                                        content);
+    };
+  }
+
+  ManifestFilterManager::PartitionSpecsById SpecsById(const TableMetadata& 
metadata) {
+    ManifestFilterManager::PartitionSpecsById specs_by_id;
+    for (const auto& spec : metadata.partition_specs) {
+      specs_by_id.emplace(spec->spec_id(), spec);
+    }
+    return specs_by_id;
+  }
+
+  // Read all entries from a list of ManifestFiles.
+  Result<std::vector<ManifestEntry>> ReadAllEntries(
+      const std::vector<ManifestFile>& manifests, const TableMetadata& 
metadata) {
+    std::vector<ManifestEntry> result;
+    for (const auto& m : manifests) {
+      ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(m.partition_spec_id));
+      ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+      ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                              ManifestReader::Make(m, file_io_, schema, spec));
+      ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+      result.insert(result.end(), entries.begin(), entries.end());
+    }
+    return result;
+  }
+
+  static constexpr int64_t kTestSnapshotId = 55555L;
+  int manifest_counter_ = 0;
+  std::shared_ptr<PartitionSpec> spec_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<DataFile> file_a_;
+  std::shared_ptr<DataFile> file_b_;
+};
+
+TEST_F(ManifestFilterManagerTest, NullSnapshotReturnsEmpty) {
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, nullptr, 
factory));
+  EXPECT_TRUE(result.empty());
+}
+
+TEST_F(ManifestFilterManagerTest, ContainsDeletesReturnsCorrectState) {
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  EXPECT_FALSE(mgr.ContainsDeletes());
+  mgr.DeleteFile("/some/path.parquet");
+  EXPECT_TRUE(mgr.ContainsDeletes());
+}
+
+TEST_F(ManifestFilterManagerTest, DeleteByRowFilterRejectsNull) {
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  EXPECT_THAT(mgr.DeleteByRowFilter(nullptr), 
IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(ManifestFilterManagerTest, DeleteFileObjectRejectsNull) {
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  std::shared_ptr<DataFile> null_file;
+  EXPECT_THAT(mgr.DeleteFile(null_file), IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(ManifestFilterManagerTest, NoConditionsReturnsManifestsUnchanged) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  // Load original manifests so we can compare paths
+  ICEBERG_UNWRAP_OR_FAIL(auto list_reader,
+                         ManifestListReader::Make(snap->manifest_list, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto orig_manifests, list_reader->Files());
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ASSERT_EQ(result.size(), orig_manifests.size());
+  for (size_t i = 0; i < result.size(); ++i) {
+    // No rewrite → same manifest path
+    EXPECT_EQ(result[i].manifest_path, orig_manifests[i].manifest_path);
+  }
+}
+
+TEST_F(ManifestFilterManagerTest, DeleteFileByPath) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.DeleteFile(file_a_->file_path);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  int deleted_count = 0;
+  int live_count = 0;
+  for (const auto& e : entries) {
+    if (e.status == ManifestStatus::kDeleted) {
+      ++deleted_count;
+      ASSERT_NE(e.data_file, nullptr);
+      EXPECT_EQ(e.data_file->file_path, file_a_->file_path);
+    } else {
+      ++live_count;
+    }
+  }
+  EXPECT_EQ(deleted_count, 1);
+  EXPECT_EQ(live_count, 1);
+}
+
+TEST_F(ManifestFilterManagerTest, ExplicitContextFilterManifestsDeletesByPath) 
{
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto list_reader,
+                         ManifestListReader::Make(snap->manifest_list, 
file_io_));
+  ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, list_reader->Files());
+  std::vector<const ManifestFile*> manifests;
+  manifests.reserve(manifest_files.size());
+  for (const auto& manifest : manifest_files) {
+    manifests.push_back(&manifest);
+  }
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.DeleteFile(file_a_->file_path);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(schema_, 
SpecsById(*metadata),
+                                                          manifests, factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  int deleted_count = 0;
+  for (const auto& entry : entries) {
+    if (entry.status == ManifestStatus::kDeleted) {
+      ++deleted_count;
+      ASSERT_NE(entry.data_file, nullptr);
+      EXPECT_EQ(entry.data_file->file_path, file_a_->file_path);
+    }
+  }
+  EXPECT_EQ(deleted_count, 1);
+}
+
+TEST_F(ManifestFilterManagerTest, RowFilterAlwaysTrueDeletesAll) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysTrue()), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  for (const auto& e : entries) {
+    EXPECT_EQ(e.status, ManifestStatus::kDeleted) << "Expected all entries to 
be DELETED";
+  }
+}
+
+TEST_F(ManifestFilterManagerTest, RowFilterAlwaysFalseDeletesNone) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysFalse()), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  for (const auto& e : entries) {
+    // AlwaysFalse means nothing can match → entries remain ADDED or EXISTING
+    EXPECT_NE(e.status, ManifestStatus::kDeleted) << "Expected no entries to 
be DELETED";
+  }
+}
+
+TEST_F(ManifestFilterManagerTest, RowFilterUsesPartitionResiduals) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.CaseSensitive(false);
+  ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::Equal("X", 
Literal::Long(1L))), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  int deleted_count = 0;
+  int live_count = 0;
+  for (const auto& e : entries) {
+    ASSERT_NE(e.data_file, nullptr);
+    if (e.status == ManifestStatus::kDeleted) {
+      ++deleted_count;
+      EXPECT_EQ(e.data_file->file_path, file_a_->file_path);
+    } else {
+      ++live_count;
+      EXPECT_EQ(e.data_file->file_path, file_b_->file_path);
+    }
+  }
+
+  EXPECT_EQ(deleted_count, 1);
+  EXPECT_EQ(live_count, 1);
+  ASSERT_EQ(mgr.FilesToBeDeleted().size(), 1U);
+  EXPECT_EQ(mgr.FilesToBeDeleted().begin()->get()->file_path, 
file_a_->file_path);
+}
+
+TEST_F(ManifestFilterManagerTest, DropPartition) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  // Drop partition of file_a (partition_x = 1)
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.DropPartition(spec_->spec_id(),
+                    PartitionValues(std::vector<Literal>{Literal::Long(1L)}));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  int deleted_count = 0;
+  for (const auto& e : entries) {
+    if (e.status == ManifestStatus::kDeleted) {
+      ++deleted_count;
+      ASSERT_TRUE(e.data_file != nullptr);
+      EXPECT_EQ(e.data_file->file_path, file_a_->file_path);
+    }
+  }
+  EXPECT_EQ(deleted_count, 1);
+}
+
+TEST_F(ManifestFilterManagerTest, FailMissingDeletePathsReturnsError) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.DeleteFile("/does/not/exist.parquet");
+  mgr.FailMissingDeletePaths();
+
+  auto result = mgr.FilterManifests(*metadata, snap, factory);
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(ManifestFilterManagerTest, FailAnyDeleteReportsPartitionPath) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.DeleteFile(file_a_->file_path);
+  mgr.FailAnyDelete();
+
+  auto result = mgr.FilterManifests(*metadata, snap, factory);
+  EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+  EXPECT_THAT(result, HasErrorMessage("x=1"));
+}
+
+TEST_F(ManifestFilterManagerTest, MultipleConditionsOrCombined) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  // Both files should be deleted: file_a by path, file_b by AlwaysTrue 
expression
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  mgr.DeleteFile(file_a_->file_path);
+  ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysTrue()), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+  for (const auto& e : entries) {
+    EXPECT_EQ(e.status, ManifestStatus::kDeleted);
+  }
+}
+
+TEST_F(ManifestFilterManagerTest, MultipleRowFiltersUseCombinedExpression) {
+  ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_}));
+  auto* metadata = table_->metadata().get();
+  auto factory = MakeWriterFactory(*metadata);
+
+  ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+  ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::Equal("y", 
Literal::Long(7L))), IsOk());
+  ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::Equal("x", 
Literal::Long(1L))), IsOk());
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap, 
factory));
+  ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+
+  ASSERT_EQ(entries.size(), 1U);
+  EXPECT_EQ(entries[0].status, ManifestStatus::kDeleted);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/manifest_merge_manager_test.cc 
b/src/iceberg/test/manifest_merge_manager_test.cc
new file mode 100644
index 00000000..b19eace8
--- /dev/null
+++ b/src/iceberg/test/manifest_merge_manager_test.cc
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_io_util.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int8_t kFormatVersion = 2;
+constexpr int64_t kSnapshotId = 12345L;
+constexpr int32_t kSpecId0 = 0;
+constexpr int32_t kSpecId1 = 1;
+
+}  // namespace
+
+class ManifestMergeManagerTest : public ::testing::Test {
+ protected:
+  static void SetUpTestSuite() { avro::RegisterAll(); }
+
+  void SetUp() override {
+    file_io_ = arrow::MakeMockFileIO();
+
+    // Simple schema: one long column
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(1, "x", int64()),
+    });
+    spec0_ = PartitionSpec::Make(kSpecId0,
+                                 {PartitionField(1, 1000, "x", 
Transform::Identity())})
+                 .value();
+    spec1_ = PartitionSpec::Make(
+                 kSpecId1, {PartitionField(1, 1001, "x_bucket", 
Transform::Bucket(8))})
+                 .value();
+
+    // Build minimal TableMetadata with both specs
+    auto builder = TableMetadataBuilder::BuildFromEmpty(kFormatVersion);
+    builder->SetCurrentSchema(schema_, schema_->HighestFieldId().value_or(0));
+    builder->SetDefaultPartitionSpec(spec0_);
+    builder->AddPartitionSpec(spec1_);
+    builder->SetDefaultSortOrder(SortOrder::Unsorted());
+    ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+    metadata_ = std::shared_ptr<TableMetadata>(std::move(metadata));
+  }
+
+  // Write a small manifest with N data files and return the ManifestFile 
descriptor.
+  Result<ManifestFile> WriteManifest(int32_t spec_id, int num_files,
+                                     int64_t file_size_override = 512,
+                                     ManifestContent content = 
ManifestContent::kData) {
+    auto path = std::format("manifest-{}.avro", manifest_counter_++);
+    auto spec = spec_id == kSpecId0 ? spec0_ : spec1_;
+    ICEBERG_ASSIGN_OR_RAISE(auto writer,
+                            ManifestWriter::MakeWriter(kFormatVersion, 
kSnapshotId, path,
+                                                       file_io_, spec, 
schema_, content));
+    for (int i = 0; i < num_files; ++i) {
+      auto f = std::make_shared<DataFile>();
+      f->content = (content == ManifestContent::kDeletes)
+                       ? DataFile::Content::kPositionDeletes
+                       : DataFile::Content::kData;
+      f->file_path = std::format("data/file-{}-{}.parquet", manifest_counter_, 
i);
+      f->file_format = FileFormatType::kParquet;
+      // Identity spec uses LONG partition values; Bucket spec uses INT
+      Literal part_val = (spec_id == kSpecId0) ? Literal::Long(i) : 
Literal::Int(i % 8);
+      f->partition = PartitionValues(std::vector<Literal>{part_val});
+      f->file_size_in_bytes = 1024;
+      f->record_count = 10;
+      f->partition_spec_id = spec_id;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(f));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, writer->ToManifestFile());
+    // Override length so we can control bin-packing behaviour in tests
+    manifest_file.manifest_length = file_size_override;
+    return manifest_file;
+  }
+
+  ManifestWriterFactory MakeWriterFactory() {
+    return [this](int32_t spec_id,
+                  ManifestContent content) -> 
Result<std::unique_ptr<ManifestWriter>> {
+      ++factory_call_count_;
+      auto spec = spec_id == kSpecId0 ? spec0_ : spec1_;
+      auto path = std::format("merged-{}.avro", manifest_counter_++);
+      return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path, 
file_io_, spec,
+                                        schema_, content);
+    };
+  }
+
+  // Count total entries across all manifests.
+  Result<int> CountEntries(const std::vector<ManifestFile>& manifests) {
+    int total = 0;
+    for (const auto& m : manifests) {
+      auto spec = m.partition_spec_id == kSpecId0 ? spec0_ : spec1_;
+      ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                              ManifestReader::Make(m, file_io_, schema_, 
spec));
+      ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+      total += static_cast<int>(entries.size());
+    }
+    return total;
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> spec0_;
+  std::shared_ptr<PartitionSpec> spec1_;
+  std::shared_ptr<TableMetadata> metadata_;
+  int manifest_counter_ = 0;
+  int factory_call_count_ = 0;
+};
+
+TEST_F(ManifestMergeManagerTest, MergeDisabled) {
+  ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1));
+  ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1));
+  ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1));
+
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, 
/*enabled=*/false);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_, 
file_io_,
+                                      MakeWriterFactory()));
+  // merge disabled → all 3 manifests returned, factory never called
+  EXPECT_EQ(result.size(), 3U);
+  EXPECT_EQ(factory_call_count_, 0);
+}
+
+TEST_F(ManifestMergeManagerTest, BelowMinCountThreshold) {
+  ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1));
+  ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1));
+
+  // min_count=3, only 2 manifests total → no merge
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(auto result,
+                         mgr.MergeManifests({m0}, {m1}, kSnapshotId, 
*metadata_, file_io_,
+                                            MakeWriterFactory()));
+  EXPECT_EQ(result.size(), 2U);
+  EXPECT_EQ(factory_call_count_, 0);
+}
+
+TEST_F(ManifestMergeManagerTest, MergeOccursAtThreshold) {
+  // 3 small manifests (each 100 bytes), target=1024 → all fit in one bin
+  ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100));
+
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_, 
file_io_,
+                                      MakeWriterFactory()));
+  // All 3 merged into 1 manifest (total 3 entries)
+  EXPECT_EQ(result.size(), 1U);
+  ICEBERG_UNWRAP_OR_FAIL(auto count1, CountEntries(result));
+  EXPECT_EQ(count1, 3);
+}
+
+TEST_F(ManifestMergeManagerTest, OversizedManifestPassedThrough) {
+  // m_large exceeds target → must not be merged; m_small fits
+  ICEBERG_UNWRAP_OR_FAIL(auto m_large, WriteManifest(kSpecId0, 2, 
/*size=*/2000));
+  ICEBERG_UNWRAP_OR_FAIL(auto m_small, WriteManifest(kSpecId0, 1, 
/*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m_small2, WriteManifest(kSpecId0, 1, 
/*size=*/100));
+
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(auto result,
+                         mgr.MergeManifests({m_large, m_small}, {m_small2}, 
kSnapshotId,
+                                            *metadata_, file_io_, 
MakeWriterFactory()));
+  // m_large is oversized and acts as a bin boundary — the two small manifests 
on either
+  // side of it are never merged together.  m_small2 (the newest) is also 
protected by
+  // minCountToMerge (size 1 < 2).  All three remain separate.
+  EXPECT_EQ(result.size(), 3U);
+  ICEBERG_UNWRAP_OR_FAIL(auto count2, CountEntries(result));
+  EXPECT_EQ(count2, 4);  // 2 + 1 + 1
+}
+
+TEST_F(ManifestMergeManagerTest, CrossSpecManifestsNotMerged) {
+  // Manifests with different spec IDs must never be merged together
+  ICEBERG_UNWRAP_OR_FAIL(auto m_spec0a, WriteManifest(kSpecId0, 1, 
/*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m_spec0b, WriteManifest(kSpecId0, 1, 
/*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m_spec1a, WriteManifest(kSpecId1, 1, 
/*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m_spec1b, WriteManifest(kSpecId1, 1, 
/*size=*/100));
+
+  // With 4 manifests (target large enough for each pair), we get 2 merged 
outputs
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result,
+      mgr.MergeManifests({m_spec0a, m_spec1a}, {m_spec0b, m_spec1b}, 
kSnapshotId,
+                         *metadata_, file_io_, MakeWriterFactory()));
+  EXPECT_EQ(result.size(), 2U);
+  // Verify spec IDs are preserved per output manifest
+  for (const auto& m : result) {
+    EXPECT_THAT(m.partition_spec_id, ::testing::AnyOf(kSpecId0, kSpecId1));
+  }
+}
+
+TEST_F(ManifestMergeManagerTest, WriterFactoryCalledOncePerMergedManifest) {
+  // 4 small manifests in two groups → 2 merged outputs → factory called twice
+  ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId1, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m3, WriteManifest(kSpecId1, 1, /*size=*/100));
+
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(auto result,
+                         mgr.MergeManifests({m0, m2}, {m1, m3}, kSnapshotId, 
*metadata_,
+                                            file_io_, MakeWriterFactory()));
+  EXPECT_EQ(result.size(), 2U);
+  EXPECT_EQ(factory_call_count_, 2);
+}
+
+TEST_F(ManifestMergeManagerTest, MixedContentManifestsNotMerged) {
+  // Data and delete manifests sharing the same spec_id must never be merged 
together.
+  // The grouping key is (spec_id, content), so they land in separate bins.
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto d0, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kData));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto d1, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kData));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del0, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del1, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result, mgr.MergeManifests({d0, del0}, {d1, del1}, kSnapshotId, 
*metadata_,
+                                      file_io_, MakeWriterFactory()));
+  // 2 data → 1 merged data manifest; 2 delete → 1 merged delete manifest
+  EXPECT_EQ(result.size(), 2U);
+  int data_count = 0;
+  int delete_count = 0;
+  for (const auto& m : result) {
+    if (m.content == ManifestContent::kData) ++data_count;
+    if (m.content == ManifestContent::kDeletes) ++delete_count;
+  }
+  EXPECT_EQ(data_count, 1);
+  EXPECT_EQ(delete_count, 1);
+}
+
+TEST_F(ManifestMergeManagerTest, MixedContentUsesFirstManifestPerContent) {
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto d0, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kData));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto d1, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kData));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del0, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del1, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+
+  // Each content type's newest manifest must be protected by the threshold
+  // independently.
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result, mgr.MergeManifests({d0, del0}, {d1, del1}, kSnapshotId, 
*metadata_,
+                                      file_io_, MakeWriterFactory()));
+
+  // Each content type has exactly two manifests, below min_count=3, so 
neither pair
+  // should be merged.
+  ASSERT_EQ(result.size(), 4U);
+  int data_count = 0;
+  int delete_count = 0;
+  for (const auto& manifest : result) {
+    if (manifest.content == ManifestContent::kData) {
+      ++data_count;
+    } else if (manifest.content == ManifestContent::kDeletes) {
+      ++delete_count;
+    }
+  }
+  EXPECT_EQ(data_count, 2);
+  EXPECT_EQ(delete_count, 2);
+}
+
+TEST_F(ManifestMergeManagerTest, DeleteManifestsMerged) {
+  // Delete manifests are bin-packed and merged just like data manifests.
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del0, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del1, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto del2, WriteManifest(kSpecId0, 1, /*size=*/100, 
ManifestContent::kDeletes));
+
+  ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(auto result,
+                         mgr.MergeManifests({del0, del1}, {del2}, kSnapshotId, 
*metadata_,
+                                            file_io_, MakeWriterFactory()));
+  EXPECT_EQ(result.size(), 1U);
+  EXPECT_EQ(result[0].content, ManifestContent::kDeletes);
+  ICEBERG_UNWRAP_OR_FAIL(auto count, CountEntries(result));
+  EXPECT_EQ(count, 3);
+}
+
+TEST_F(ManifestMergeManagerTest, PackEndOlderManifestsMergedNotNewest) {
+  // packEnd semantics: for [m0_new, m1_old, m2_old] with target=250 (pairs 
fit but
+  // triples don't), packing from the end merges m1+m2 (the older pair) and 
leaves
+  // m0 (the newest) in its own under-filled bin at the front of the output.
+  // This is the opposite of naive forward packing, which would merge m0+m1.
+  ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100));
+  ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100));
+
+  // target=250 fits two 100-byte manifests but not three.
+  // min_count=3 so m0's single-element bin is kept as-is (below threshold).
+  ManifestMergeManager mgr(/*target=*/250, /*min_count=*/3, /*enabled=*/true);
+  ICEBERG_UNWRAP_OR_FAIL(
+      auto result, mgr.MergeManifests({m1, m2}, {m0}, kSnapshotId, *metadata_, 
file_io_,
+                                      MakeWriterFactory()));
+  // Expected: [m0 (pass-through), merged(m1+m2)]
+  ASSERT_EQ(result.size(), 2U);
+  // First output is the newest manifest m0, passed through unchanged 
(under-filled bin).
+  EXPECT_EQ(result[0].manifest_length, m0.manifest_length);
+  // Second output is the merged older pair — it must be a newly written 
manifest
+  // (different path than either original).
+  EXPECT_NE(result[1].manifest_path, m1.manifest_path);
+  EXPECT_NE(result[1].manifest_path, m2.manifest_path);
+  ICEBERG_UNWRAP_OR_FAIL(auto count, CountEntries(result));
+  EXPECT_EQ(count, 3);
+}
+
+}  // namespace iceberg

Reply via email to