This is an automated email from the ASF dual-hosted git repository.
wgtmac pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new bf262180 feat(manifest): add ManifestFilterManager and
ManifestMergeManager (#652)
bf262180 is described below
commit bf26218098778356b9f8d703478ddd2235d7cc68
Author: Guotao Yu <[email protected]>
AuthorDate: Wed May 20 14:19:01 2026 +0800
feat(manifest): add ManifestFilterManager and ManifestMergeManager (#652)
Implement two manifest management classes for table write operations:
- ManifestFilterManager: filters manifest entries by row filter
expression, file path, or partition value; supports
FailMissingDeletePaths validation. Rewrites manifests that contain
matching files, marking entries as DELETED; passes through manifests
that cannot contain matching files unchanged.
- ManifestMergeManager: merges small manifests using greedy bin-packing,
grouping by partition_spec_id (manifests with different specs are never
merged). Oversized manifests pass through unchanged. ADDED entries from
prior manifests become EXISTING when merged (matching Java semantics).
---
src/iceberg/CMakeLists.txt | 2 +
src/iceberg/manifest/manifest_filter_manager.cc | 423 +++++++++++++++++++++++
src/iceberg/manifest/manifest_filter_manager.h | 229 ++++++++++++
src/iceberg/manifest/manifest_merge_manager.cc | 191 ++++++++++
src/iceberg/manifest/manifest_merge_manager.h | 114 ++++++
src/iceberg/manifest/manifest_writer.h | 5 +
src/iceberg/manifest/meson.build | 2 +
src/iceberg/meson.build | 2 +
src/iceberg/schema.cc | 2 +
src/iceberg/schema.h | 2 +
src/iceberg/test/CMakeLists.txt | 2 +
src/iceberg/test/manifest_filter_manager_test.cc | 382 ++++++++++++++++++++
src/iceberg/test/manifest_merge_manager_test.cc | 355 +++++++++++++++++++
13 files changed, 1711 insertions(+)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 79298b1a..68cacebe 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -45,8 +45,10 @@ set(ICEBERG_SOURCES
location_provider.cc
manifest/manifest_adapter.cc
manifest/manifest_entry.cc
+ manifest/manifest_filter_manager.cc
manifest/manifest_group.cc
manifest/manifest_list.cc
+ manifest/manifest_merge_manager.cc
manifest/manifest_reader.cc
manifest/manifest_util.cc
manifest/manifest_writer.cc
diff --git a/src/iceberg/manifest/manifest_filter_manager.cc
b/src/iceberg/manifest/manifest_filter_manager.cc
new file mode 100644
index 00000000..086c94a7
--- /dev/null
+++ b/src/iceberg/manifest/manifest_filter_manager.cc
@@ -0,0 +1,423 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/expression/manifest_evaluator.h"
+#include "iceberg/expression/residual_evaluator.h"
+#include "iceberg/expression/strict_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+using PartitionSpecsById = ManifestFilterManager::PartitionSpecsById;
+
+bool HasRowFilterExpression(const std::shared_ptr<Expression>& expr) {
+ return expr != nullptr && expr->op() != Expression::Operation::kFalse;
+}
+
+Result<std::shared_ptr<PartitionSpec>> PartitionSpecById(
+ const PartitionSpecsById& specs_by_id, int32_t spec_id) {
+ auto iter = specs_by_id.find(spec_id);
+ if (iter == specs_by_id.end() || iter->second == nullptr) {
+ return NotFound("Partition spec with ID {} is not found", spec_id);
+ }
+ return iter->second;
+}
+
+Result<std::string> FormatPartitionPath(const PartitionSpecsById& specs_by_id,
+ const DataFile& file, int32_t spec_id)
{
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id));
+ return spec->PartitionPath(file.partition);
+}
+
+} // namespace
+
+ManifestFilterManager::ManifestFilterManager(ManifestContent content,
+ std::shared_ptr<FileIO> file_io)
+ : manifest_content_(content),
+ file_io_(std::move(file_io)),
+ delete_expr_(Expressions::AlwaysFalse()) {}
+
+ManifestFilterManager::~ManifestFilterManager() = default;
+
+Status ManifestFilterManager::DeleteByRowFilter(std::shared_ptr<Expression>
expr) {
+ ICEBERG_PRECHECK(expr != nullptr, "Cannot delete files using filter: null");
+ ICEBERG_ASSIGN_OR_RAISE(delete_expr_, Or::MakeFolded(delete_expr_,
std::move(expr)));
+ manifest_evaluator_cache_.clear();
+ residual_evaluator_cache_.clear();
+ return {};
+}
+
+void ManifestFilterManager::CaseSensitive(bool case_sensitive) {
+ case_sensitive_ = case_sensitive;
+ manifest_evaluator_cache_.clear();
+ residual_evaluator_cache_.clear();
+}
+
+void ManifestFilterManager::DeleteFile(std::string_view path) {
+ delete_paths_.insert(std::string(path));
+}
+
+Status ManifestFilterManager::DeleteFile(std::shared_ptr<DataFile> file) {
+ ICEBERG_PRECHECK(file != nullptr, "Cannot delete file: null");
+ delete_paths_.insert(file->file_path);
+ delete_files_.insert(std::move(file));
+ return {};
+}
+
+const DataFileSet& ManifestFilterManager::FilesToBeDeleted() const {
+ return delete_files_;
+}
+
+void ManifestFilterManager::DropPartition(int32_t spec_id, PartitionValues
partition) {
+ drop_partitions_.add(spec_id, std::move(partition));
+}
+
+void ManifestFilterManager::FailMissingDeletePaths() {
+ fail_missing_delete_paths_ = true;
+}
+
+void ManifestFilterManager::FailAnyDelete() { fail_any_delete_ = true; }
+
+bool ManifestFilterManager::ContainsDeletes() const {
+ return HasRowFilterExpression(delete_expr_) || !delete_paths_.empty() ||
+ !drop_partitions_.empty();
+}
+
+Result<bool> ManifestFilterManager::CanContainDroppedFiles(const
ManifestFile&) const {
+ // TODO(Guotao): Use the manifest descriptor to skip unrelated object-delete
+ // manifests once object-delete partitions are tracked separately.
+ // Currently, DeleteFile(std::shared_ptr<DataFile>) degrades to a path-based
delete,
+ // which forces scanning all manifests.
+ return !delete_paths_.empty();
+}
+
+Result<bool> ManifestFilterManager::CanContainDroppedPartitions(
+ const ManifestFile& manifest) const {
+ if (drop_partitions_.empty()) return false;
+ // TODO(Guotao): Use partition_summaries bounds to skip manifests that cannot
+ // contain any dropped partition, instead of only matching partition spec
IDs.
+ // Only manifests whose partition spec matches a registered drop can contain
+ // entries for that partition. PartitionKey is pair<spec_id, values>.
+ int32_t spec_id = manifest.partition_spec_id;
+ for (const auto& key : drop_partitions_) {
+ if (key.first == spec_id) return true;
+ }
+ return false;
+}
+
+Result<bool> ManifestFilterManager::CanContainExpressionDeletes(
+ const ManifestFile& manifest, const std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById& specs_by_id) {
+ if (!HasRowFilterExpression(delete_expr_)) return false;
+ int32_t spec_id = manifest.partition_spec_id;
+ ICEBERG_ASSIGN_OR_RAISE(auto* evaluator,
+ GetManifestEvaluator(schema, specs_by_id, spec_id));
+ return evaluator->Evaluate(manifest);
+}
+
+Result<bool> ManifestFilterManager::CanContainDeletedFiles(
+ const ManifestFile& manifest, const std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById& specs_by_id, bool trust_manifest_references) {
+ // A manifest with no live files cannot contain files to delete.
+ // Missing counts mean the count is unknown; treat it as possibly non-zero.
+ bool has_live = !manifest.added_files_count.has_value() ||
+ manifest.added_files_count.value() > 0 ||
+ !manifest.existing_files_count.has_value() ||
+ manifest.existing_files_count.value() > 0;
+ if (!has_live) return false;
+
+ if (trust_manifest_references) {
+ // TODO(Guotao): Return whether this manifest is in the referenced
manifest set.
+ return true;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto can_contain_dropped_files,
+ CanContainDroppedFiles(manifest));
+ if (can_contain_dropped_files) return true;
+
+ ICEBERG_ASSIGN_OR_RAISE(auto can_contain_expression_deletes,
+ CanContainExpressionDeletes(manifest, schema,
specs_by_id));
+ if (can_contain_expression_deletes) return true;
+
+ return CanContainDroppedPartitions(manifest);
+}
+
+Result<ManifestEvaluator*> ManifestFilterManager::GetManifestEvaluator(
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ int32_t spec_id) {
+ auto& evaluator = manifest_evaluator_cache_[spec_id];
+ if (!evaluator) {
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id,
spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(evaluator, ManifestEvaluator::MakeRowFilter(
+ delete_expr_, spec, *schema,
case_sensitive_));
+ }
+ return evaluator.get();
+}
+
+Result<ResidualEvaluator*> ManifestFilterManager::GetResidualEvaluator(
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ int32_t spec_id) {
+ auto& evaluator = residual_evaluator_cache_[spec_id];
+ if (!evaluator) {
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id,
spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(evaluator, ResidualEvaluator::Make(delete_expr_,
*spec,
+ *schema,
case_sensitive_));
+ }
+ return evaluator.get();
+}
+
+Result<bool> ManifestFilterManager::ShouldDelete(const ManifestEntry& entry,
+ const
std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById&
specs_by_id,
+ int32_t manifest_spec_id) {
+ if (!entry.data_file) return false;
+ const DataFile& file = *entry.data_file;
+ int32_t spec_id = file.partition_spec_id.value_or(manifest_spec_id);
+
+ // Path-based and partition-drop checks
+ if (delete_paths_.count(file.file_path) ||
+ drop_partitions_.contains(spec_id, file.partition)) {
+ if (fail_any_delete_) {
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_path,
+ FormatPartitionPath(specs_by_id, file, spec_id));
+ return InvalidArgument("Operation would delete existing data: {}",
partition_path);
+ }
+ return true;
+ }
+
+ if (HasRowFilterExpression(delete_expr_)) {
+ ICEBERG_ASSIGN_OR_RAISE(auto* residual_eval,
+ GetResidualEvaluator(schema, specs_by_id,
spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(auto residual_expr,
+ residual_eval->ResidualFor(file.partition));
+ // TODO(Guotao): Cache strict/inclusive metrics evaluators per partition
residual.
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto strict_eval,
+ StrictMetricsEvaluator::Make(residual_expr, schema, case_sensitive_));
+ ICEBERG_ASSIGN_OR_RAISE(auto strict_match, strict_eval->Evaluate(file));
+ if (strict_match) {
+ if (fail_any_delete_) {
+ ICEBERG_ASSIGN_OR_RAISE(auto partition_path,
+ FormatPartitionPath(specs_by_id, file,
spec_id));
+ return InvalidArgument("Operation would delete existing data: {}",
+ partition_path);
+ }
+ return true;
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto incl_eval, InclusiveMetricsEvaluator::Make(
+ residual_expr, *schema,
case_sensitive_));
+ ICEBERG_ASSIGN_OR_RAISE(auto incl_match, incl_eval->Evaluate(file));
+ if (incl_match) {
+ if (manifest_content_ == ManifestContent::kDeletes) {
+ return false;
+ }
+ return InvalidArgument(
+ "Cannot delete file where some, but not all, rows match filter: {}",
+ file.file_path);
+ }
+ }
+
+ return false;
+}
+
+bool ManifestFilterManager::CanTrustManifestReferences(
+ const std::vector<const ManifestFile*>&) const {
+ // TODO(Guotao): Track source manifest locations for object deletes so
manifests
+ // outside the referenced set can be skipped before any other delete checks.
+ return false;
+}
+
+Result<ManifestFile> ManifestFilterManager::FilterManifest(
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ const ManifestFile& manifest, bool trust_manifest_references,
+ const ManifestWriterFactory& writer_factory,
+ std::unordered_set<std::string>& found_paths) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto can_contain_deleted_files,
+ CanContainDeletedFiles(manifest, schema, specs_by_id,
trust_manifest_references));
+ if (!can_contain_deleted_files) {
+ return manifest;
+ }
+
+ int32_t spec_id = manifest.partition_spec_id;
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, PartitionSpecById(specs_by_id, spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(manifest, file_io_, schema,
spec));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->LiveEntries());
+
+ ICEBERG_ASSIGN_OR_RAISE(auto has_deleted_files,
+ ManifestHasDeletedFiles(entries, schema,
specs_by_id, spec_id));
+ if (!has_deleted_files) {
+ return manifest;
+ }
+
+ return FilterManifestWithDeletedFiles(entries, spec_id, schema, specs_by_id,
+ writer_factory, found_paths);
+}
+
+Result<bool> ManifestFilterManager::ManifestHasDeletedFiles(
+ const std::vector<ManifestEntry>& entries, const std::shared_ptr<Schema>&
schema,
+ const PartitionSpecsById& specs_by_id, int32_t manifest_spec_id) {
+ for (const auto& entry : entries) {
+ ICEBERG_ASSIGN_OR_RAISE(auto should_delete,
+ ShouldDelete(entry, schema, specs_by_id,
manifest_spec_id));
+ if (should_delete) {
+ return true;
+ }
+ }
+ return false;
+}
+
+Result<ManifestFile> ManifestFilterManager::FilterManifestWithDeletedFiles(
+ const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ const ManifestWriterFactory& writer_factory,
+ std::unordered_set<std::string>& found_paths) {
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ writer_factory(manifest_spec_id, manifest_content_));
+ for (const auto& entry : entries) {
+ ICEBERG_ASSIGN_OR_RAISE(auto should_delete,
+ ShouldDelete(entry, schema, specs_by_id,
manifest_spec_id));
+ if (should_delete) {
+ if (entry.data_file && delete_paths_.count(entry.data_file->file_path)) {
+ found_paths.insert(entry.data_file->file_path);
+ }
+ if (entry.data_file) {
+ // TODO(Guotao): Track duplicate deletes and avoid full DataFile
copies when
+ // summary generation can use lighter records.
+ delete_files_.insert(std::make_shared<DataFile>(*entry.data_file));
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
+ } else {
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
+ }
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ return writer->ToManifestFile();
+}
+
+Status ManifestFilterManager::ValidateRequiredDeletes(
+ const std::unordered_set<std::string>& found_paths) const {
+ if (!fail_missing_delete_paths_) {
+ return {};
+ }
+
+ std::string missing;
+ for (const auto& path : delete_paths_) {
+ if (!found_paths.count(path)) {
+ if (!missing.empty()) missing += ", ";
+ missing += path;
+ }
+ }
+ if (!missing.empty()) {
+ return InvalidArgument("Missing delete paths: {}", missing);
+ }
+ return {};
+}
+
+Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
+ const TableMetadata& metadata, const std::shared_ptr<Snapshot>&
base_snapshot,
+ const ManifestWriterFactory& writer_factory) {
+ if (!base_snapshot) {
+ ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes({}));
+ return std::vector<ManifestFile>{};
+ }
+
+ ICEBERG_PRECHECK(file_io_ != nullptr, "Cannot filter manifests: FileIO is
null");
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto list_reader, ManifestListReader::Make(base_snapshot->manifest_list,
file_io_));
+ ICEBERG_ASSIGN_OR_RAISE(auto all_manifests, list_reader->Files());
+
+ std::vector<const ManifestFile*> manifests;
+ manifests.reserve(all_manifests.size());
+ for (auto& manifest : all_manifests) {
+ manifests.push_back(&manifest);
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+ TableMetadataCache metadata_cache(&metadata);
+ ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id,
metadata_cache.GetPartitionSpecsById());
+
+ return FilterManifests(schema, specs_by_id.get(), manifests, writer_factory);
+}
+
+Result<std::vector<ManifestFile>> ManifestFilterManager::FilterManifests(
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ const std::vector<const ManifestFile*>& input_manifests,
+ const ManifestWriterFactory& writer_factory) {
+ ICEBERG_PRECHECK(schema != nullptr, "Cannot filter manifests: schema is
null");
+ ICEBERG_PRECHECK(file_io_ != nullptr, "Cannot filter manifests: FileIO is
null");
+
+ std::vector<const ManifestFile*> manifests;
+ manifests.reserve(input_manifests.size());
+ for (const auto* manifest : input_manifests) {
+ ICEBERG_PRECHECK(manifest != nullptr, "Cannot filter manifests: manifest
is null");
+ if (manifest->content == manifest_content_) {
+ manifests.push_back(manifest);
+ }
+ }
+
+ std::unordered_set<std::string> found_paths;
+ if (manifests.empty()) {
+ ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
+ return std::vector<ManifestFile>{};
+ }
+
+ bool trust_manifest_references = CanTrustManifestReferences(manifests);
+ manifest_evaluator_cache_.clear();
+ residual_evaluator_cache_.clear();
+
+ // TODO(Guotao): Parallelize manifest filtering with per-manifest results,
then
+ // merge found paths and deleted files after the loop.
+ std::vector<ManifestFile> filtered;
+ filtered.reserve(manifests.size());
+ for (const auto* manifest_ptr : manifests) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto filtered_manifest,
+ FilterManifest(schema, specs_by_id, *manifest_ptr,
trust_manifest_references,
+ writer_factory, found_paths));
+ filtered.push_back(std::move(filtered_manifest));
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(ValidateRequiredDeletes(found_paths));
+ return filtered;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_filter_manager.h
b/src/iceberg/manifest/manifest_filter_manager.h
new file mode 100644
index 00000000..55258b2b
--- /dev/null
+++ b/src/iceberg/manifest/manifest_filter_manager.h
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_filter_manager.h
+/// Filters an existing snapshot's manifest list, marking data files as DELETED
+/// or EXISTING based on row-filter expressions, exact path deletes, and
partition drops.
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/data_file_set.h"
+#include "iceberg/util/partition_value_util.h"
+
+namespace iceberg {
+
+/// \brief Filters an existing snapshot's manifest list.
+///
+/// The manager accumulates delete conditions incrementally, then applies them
all
+/// at once in a single FilterManifests() call. Manifests that contain no
deleted
+/// entries are returned unchanged (no I/O). Manifests that do contain deleted
+/// entries are rewritten with those entries marked DELETED.
+///
+/// The manager is content-agnostic: pass ManifestContent::kData to process
data
+/// manifests, or ManifestContent::kDeletes to process delete manifests.
+///
+/// TODO(Guotao): For ManifestContent::kDeletes, implement cleanup for orphan
delete files
+/// and dangling deletion vectors.
+///
+/// \note This class is non-copyable and non-movable.
+class ICEBERG_EXPORT ManifestFilterManager {
+ public:
+ using PartitionSpecsById = std::unordered_map<int32_t,
std::shared_ptr<PartitionSpec>>;
+
+ ManifestFilterManager(ManifestContent content, std::shared_ptr<FileIO>
file_io);
+ ~ManifestFilterManager();
+
+ ManifestFilterManager(const ManifestFilterManager&) = delete;
+ ManifestFilterManager& operator=(const ManifestFilterManager&) = delete;
+
+ /// \brief Register a row-filter expression.
+ ///
+ /// Any manifest entry whose column metrics indicate the file may satisfy the
+ /// expression will be marked DELETED.
+ ///
+ /// \param expr The expression to match files against
+ Status DeleteByRowFilter(std::shared_ptr<Expression> expr);
+
+ /// \brief Set whether row-filter field binding is case-sensitive.
+ void CaseSensitive(bool case_sensitive);
+
+ /// \brief Register an exact file path for deletion.
+ ///
+ /// Any manifest entry whose file_path matches this path will be marked
DELETED.
+ ///
+ /// \param path The exact file path to delete
+ void DeleteFile(std::string_view path);
+
+ /// \brief Register a file object for deletion.
+ ///
+ /// Any manifest entry whose file_path matches file->file_path will be marked
+ /// DELETED. The file object is retained in FilesToBeDeleted(), allowing
callers
+ /// to enumerate deleted file objects for follow-up delete-file cleanup.
+ /// Duplicate registrations (same path) are silently ignored.
+ ///
+ /// \param file The data/delete file to delete (must not be null)
+ Status DeleteFile(std::shared_ptr<DataFile> file);
+
+ /// \brief Returns the set of file objects marked for deletion by this
manager.
+ ///
+ /// This includes files registered via DeleteFile(DataFile) and files
discovered
+ /// during FilterManifests() that were deleted by path, partition, or
row-filter
+ /// matching. Used by higher-level operations (e.g. RowDelta) to enumerate
the
+ /// deleted data files for delete-file cleanup.
+ const DataFileSet& FilesToBeDeleted() const;
+
+ /// \brief Register a partition for dropping.
+ ///
+ /// Any manifest entry whose (spec_id, partition) pair matches will be
marked DELETED.
+ ///
+ /// \param spec_id The partition spec ID
+ /// \param partition The partition values to drop
+ void DropPartition(int32_t spec_id, PartitionValues partition);
+
+ /// \brief Set a flag that makes FilterManifests() fail if any registered
+ /// delete path was not found in any manifest entry.
+ void FailMissingDeletePaths();
+
+ /// \brief Set a flag that makes FilterManifests() return an error if any
+ /// manifest entry matches a delete condition.
+ void FailAnyDelete();
+
+ /// \brief Returns true if any delete condition has been registered.
+ bool ContainsDeletes() const;
+
+ /// \brief Apply all accumulated delete conditions to the base snapshot's
manifests.
+ ///
+ /// Manifests that cannot possibly contain deleted files are returned
unchanged.
+ /// Manifests that do contain deleted files are rewritten using
writer_factory.
+ ///
+ /// \param metadata Table metadata (provides specs and schema for evaluators)
+ /// \param base_snapshot The snapshot whose manifests to filter (may be null)
+ /// \param writer_factory Factory to create new ManifestWriter instances
+ /// \return The filtered manifest list, or an error
+ Result<std::vector<ManifestFile>> FilterManifests(
+ const TableMetadata& metadata, const std::shared_ptr<Snapshot>&
base_snapshot,
+ const ManifestWriterFactory& writer_factory);
+
+ /// \brief Apply all accumulated delete conditions to the provided manifests.
+ ///
+ /// This overload accepts only the context needed for filtering. It is
intended for
+ /// callers that already have the active schema, partition specs, and
manifest list.
+ ///
+ /// \param schema Active schema to bind row-filter expressions and metrics
evaluators
+ /// \param specs_by_id All partition specs keyed by spec ID
+ /// \param manifests Manifest descriptors to filter
+ /// \param writer_factory Factory to create new ManifestWriter instances
+ /// \return The filtered manifest list, or an error
+ Result<std::vector<ManifestFile>> FilterManifests(
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ const std::vector<const ManifestFile*>& manifests,
+ const ManifestWriterFactory& writer_factory);
+
+ private:
+ /// \brief Returns true if the manifest might contain files matching any
expression.
+ Result<bool> CanContainExpressionDeletes(const ManifestFile& manifest,
+ const std::shared_ptr<Schema>&
schema,
+ const PartitionSpecsById&
specs_by_id);
+
+ /// \brief Returns true if the manifest might contain files in a dropped
partition.
+ ///
+ /// Checks whether the manifest's partition_spec_id matches any spec_id
registered
+ /// via DropPartition(). Manifests from a different spec cannot contain the
dropped
+ /// partition values.
+ Result<bool> CanContainDroppedPartitions(const ManifestFile& manifest) const;
+
+ /// \brief Returns true if the manifest might contain path-deleted files.
+ Result<bool> CanContainDroppedFiles(const ManifestFile& manifest) const;
+
+ /// \brief Returns true if the manifest possibly contains any deleted file.
+ Result<bool> CanContainDeletedFiles(const ManifestFile& manifest,
+ const std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById& specs_by_id,
+ bool trust_manifest_references);
+
+ bool CanTrustManifestReferences(
+ const std::vector<const ManifestFile*>& manifests) const;
+
+ Result<ManifestFile> FilterManifest(const std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById& specs_by_id,
+ const ManifestFile& manifest,
+ bool trust_manifest_references,
+ const ManifestWriterFactory&
writer_factory,
+ std::unordered_set<std::string>&
found_paths);
+
+ Result<bool> ManifestHasDeletedFiles(const std::vector<ManifestEntry>&
entries,
+ const std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById& specs_by_id,
+ int32_t manifest_spec_id);
+
+ Result<ManifestFile> FilterManifestWithDeletedFiles(
+ const std::vector<ManifestEntry>& entries, int32_t manifest_spec_id,
+ const std::shared_ptr<Schema>& schema, const PartitionSpecsById&
specs_by_id,
+ const ManifestWriterFactory& writer_factory,
+ std::unordered_set<std::string>& found_paths);
+
+ Status ValidateRequiredDeletes(
+ const std::unordered_set<std::string>& found_paths) const;
+
+ /// \brief Get or create a ManifestEvaluator for the given spec.
+ Result<ManifestEvaluator*> GetManifestEvaluator(const
std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById&
specs_by_id,
+ int32_t spec_id);
+
+ /// \brief Get or create a ResidualEvaluator for the given spec.
+ Result<ResidualEvaluator*> GetResidualEvaluator(const
std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById&
specs_by_id,
+ int32_t spec_id);
+
+ /// \brief Check whether a single entry should be deleted.
+ Result<bool> ShouldDelete(const ManifestEntry& entry,
+ const std::shared_ptr<Schema>& schema,
+ const PartitionSpecsById& specs_by_id,
+ int32_t manifest_spec_id);
+
+ const ManifestContent manifest_content_;
+ std::shared_ptr<FileIO> file_io_;
+
+ std::shared_ptr<Expression> delete_expr_;
+ std::unordered_set<std::string> delete_paths_;
+ DataFileSet delete_files_;
+ PartitionSet drop_partitions_;
+ bool fail_missing_delete_paths_{false};
+ bool fail_any_delete_{false};
+ bool case_sensitive_{true};
+
+ std::unordered_map<int32_t, std::unique_ptr<ManifestEvaluator>>
+ manifest_evaluator_cache_;
+ std::unordered_map<int32_t, std::unique_ptr<ResidualEvaluator>>
+ residual_evaluator_cache_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_merge_manager.cc
b/src/iceberg/manifest/manifest_merge_manager.cc
new file mode 100644
index 00000000..056dce3f
--- /dev/null
+++ b/src/iceberg/manifest/manifest_merge_manager.cc
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <algorithm>
+#include <array>
+#include <iterator>
+#include <map>
+#include <ranges>
+#include <utility>
+#include <vector>
+
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+ManifestMergeManager::ManifestMergeManager(int64_t target_size_bytes,
+ int32_t min_count_to_merge, bool
merge_enabled)
+ : target_size_bytes_(target_size_bytes),
+ min_count_to_merge_(min_count_to_merge),
+ merge_enabled_(merge_enabled) {}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeManifests(
+ const std::vector<ManifestFile>& existing_manifests,
+ const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+ const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+ const ManifestWriterFactory& writer_factory) {
+ // Combine new then existing (new-first ordering is preserved in output).
+ auto to_manifest_ptr = [](const ManifestFile& manifest) { return &manifest;
};
+ auto manifest_ranges = std::array{
+ new_manifests | std::views::transform(to_manifest_ptr),
+ existing_manifests | std::views::transform(to_manifest_ptr),
+ };
+
+ std::vector<const ManifestFile*> all;
+ all.reserve(new_manifests.size() + existing_manifests.size());
+ std::ranges::copy(manifest_ranges | std::views::join,
std::back_inserter(all));
+
+ if (all.empty() || !merge_enabled_) {
+ return all |
+ std::views::transform([](const ManifestFile* manifest) { return
*manifest; }) |
+ std::ranges::to<std::vector<ManifestFile>>();
+ }
+
+ // Track the first (newest) manifest independently per content type.
+ std::map<ManifestContent, const ManifestFile*> first_by_content;
+ std::ranges::for_each(all, [&first_by_content](const ManifestFile* manifest)
{
+ first_by_content.try_emplace(manifest->content, manifest);
+ });
+
+ // Group manifests by (partition_spec_id, content), never merging across
specs or
+ // content types. Reverse spec ordering preserves v3 first-row-id assignment
order.
+ using GroupKey = std::pair<int32_t, ManifestContent>;
+ auto group_key = [](const ManifestFile* manifest) {
+ return GroupKey{manifest->partition_spec_id, manifest->content};
+ };
+
+ std::map<GroupKey, std::vector<const ManifestFile*>, std::greater<>> by_spec;
+ std::ranges::for_each(all, [&by_spec, &group_key](const ManifestFile*
manifest) {
+ by_spec[group_key(manifest)].push_back(manifest);
+ });
+
+ std::vector<ManifestFile> result;
+ result.reserve(all.size());
+ for (auto& [key, group] : by_spec) {
+ const auto* first = first_by_content.at(key.second);
+ ICEBERG_ASSIGN_OR_RAISE(auto merged, MergeGroup(group, first, snapshot_id,
metadata,
+ file_io, writer_factory));
+ std::ranges::move(merged, std::back_inserter(result));
+ }
+ return result;
+}
+
+Result<std::vector<ManifestFile>> ManifestMergeManager::MergeGroup(
+ const std::vector<const ManifestFile*>& group, const ManifestFile* first,
+ int64_t snapshot_id, const TableMetadata& metadata,
std::shared_ptr<FileIO> file_io,
+ const ManifestWriterFactory& writer_factory) {
+ // Match packEnd(group, ManifestFile::length) with lookback 1:
+ // 1. Process manifests in reverse order (oldest-first).
+ // 2. Greedy forward-pack with lookback=1: emit the current bin when the
next item
+ // doesn't fit, then start a new bin.
+ // 3. Reverse each bin (restoring original item order within a bin).
+ // 4. Reverse the bin list (newest manifest's bin ends up first).
+ // Effect: the newest manifest is in the first, possibly under-filled, bin.
+ std::vector<std::vector<const ManifestFile*>> bins;
+ std::vector<const ManifestFile*> current_bin;
+ int64_t bin_size = 0;
+
+ for (const auto* manifest : std::views::reverse(group)) {
+ if (!current_bin.empty() &&
+ bin_size + manifest->manifest_length > target_size_bytes_) {
+ bins.push_back(std::move(current_bin));
+ current_bin.clear();
+ bin_size = 0;
+ }
+ current_bin.push_back(manifest);
+ bin_size += manifest->manifest_length;
+ }
+ if (!current_bin.empty()) {
+ bins.push_back(std::move(current_bin));
+ }
+
+ for (auto& bin : bins) {
+ std::ranges::reverse(bin);
+ }
+ std::ranges::reverse(bins);
+
+ // Process each bin: if the bin contains the newest manifest and is too
small,
+ // pass its contents through unchanged.
+ std::vector<ManifestFile> result;
+ result.reserve(group.size());
+ // TODO(Guotao): Flush independent bins in parallel and cache successful
merged bins
+ // for commit retries.
+ for (auto& bin : bins) {
+ bool contains_first = std::ranges::find(bin, first) != bin.end();
+ if (contains_first && std::cmp_less(bin.size(), min_count_to_merge_)) {
+ for (const auto* manifest : bin) {
+ result.push_back(*manifest);
+ }
+ } else {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto merged, FlushBin(bin, snapshot_id, metadata, file_io,
writer_factory));
+ result.push_back(std::move(merged));
+ }
+ }
+
+ return result;
+}
+
+Result<ManifestFile> ManifestMergeManager::FlushBin(
+ const std::vector<const ManifestFile*>& bin, int64_t snapshot_id,
+ const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+ const ManifestWriterFactory& writer_factory) {
+ // A single-manifest bin requires no merging.
+ if (bin.size() == 1) return *bin[0];
+
+ const ManifestFile& first = *bin[0];
+ int32_t spec_id = first.partition_spec_id;
+
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+
+ ICEBERG_ASSIGN_OR_RAISE(auto writer, writer_factory(spec_id, first.content));
+
+ for (const auto* manifest : bin) {
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(*manifest, file_io, schema,
spec));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+ for (const auto& entry : entries) {
+ bool is_current =
+ entry.snapshot_id.has_value() && entry.snapshot_id.value() ==
snapshot_id;
+ if (entry.status == ManifestStatus::kDeleted) {
+ // Carry forward only the current snapshot's deletes; drop older
tombstones.
+ if (is_current) {
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteDeletedEntry(entry));
+ }
+ } else if (entry.status == ManifestStatus::kAdded && is_current) {
+ // Files added by the current snapshot retain their ADDED status.
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+ } else {
+ // Files added by prior snapshots (ADDED or EXISTING) become EXISTING.
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteExistingEntry(entry));
+ }
+ }
+ }
+
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ return writer->ToManifestFile();
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_merge_manager.h
b/src/iceberg/manifest/manifest_merge_manager.h
new file mode 100644
index 00000000..16cc8d98
--- /dev/null
+++ b/src/iceberg/manifest/manifest_merge_manager.h
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/manifest/manifest_merge_manager.h
+/// Merges small manifests into fewer larger ones according to table
properties.
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+
+namespace iceberg {
+
+/// \brief Merges small manifests into larger ones using greedy bin-packing.
+///
+/// Manifests are grouped by partition_spec_id before merging; manifests with
+/// different spec IDs are never merged together. Within a group, manifests
are
+/// accumulated into bins until a bin would exceed target_size_bytes, at which
+/// point the bin is flushed (written) and a new one started. Manifests
already
+/// larger than target_size_bytes pass through unchanged.
+///
+/// \note This class is non-copyable and non-movable.
+class ICEBERG_EXPORT ManifestMergeManager {
+ public:
+ /// \brief Construct a merge manager with the given configuration.
+ ///
+ /// \param target_size_bytes Target output manifest size in bytes
+ /// \param min_count_to_merge Minimum number of manifests before any merging
occurs
+ /// \param merge_enabled Whether merging is enabled at all
+ ManifestMergeManager(int64_t target_size_bytes, int32_t min_count_to_merge,
+ bool merge_enabled);
+
+ ManifestMergeManager(const ManifestMergeManager&) = delete;
+ ManifestMergeManager& operator=(const ManifestMergeManager&) = delete;
+
+ /// \brief Merge existing and new manifests according to configured
thresholds.
+ ///
+ /// Manifests are grouped by (partition_spec_id, content) — data and delete
manifests
+ /// are never merged together. Within each group, a greedy bin-packing
algorithm
+ /// combines manifests up to target_size_bytes. The bin that contains the
newest
+ /// manifest for that content type is protected by min_count_to_merge: if it
has fewer
+ /// than that many items it is passed through unchanged.
+ ///
+ /// \note Retry and rollback cleanup are handled by the caller that owns
created
+ /// manifest paths.
+ /// TODO(Guotao): Add explicit replaced-manifest tracking here if callers
need direct
+ /// access.
+ ///
+ /// \param existing_manifests Manifests already in the base snapshot
+ /// \param new_manifests Newly written manifests to incorporate
+ /// \param snapshot_id The ID of the snapshot being committed. Used to
preserve
+ /// ADDED/DELETED status for entries written by this snapshot and to
suppress
+ /// stale DELETED tombstones from prior snapshots.
+ /// \param metadata Table metadata (provides specs and schema for readers)
+ /// \param file_io File IO used to open existing manifests for reading
+ /// \param writer_factory Factory to create new ManifestWriter instances
+ /// \return The merged manifest list, or an error
+ Result<std::vector<ManifestFile>> MergeManifests(
+ const std::vector<ManifestFile>& existing_manifests,
+ const std::vector<ManifestFile>& new_manifests, int64_t snapshot_id,
+ const TableMetadata& metadata, std::shared_ptr<FileIO> file_io,
+ const ManifestWriterFactory& writer_factory);
+
+ private:
+ /// \brief Merge a group of manifests sharing the same spec_id.
+ ///
+ /// \param first The overall first (newest) manifest across all groups, used
to
+ /// apply the min_count_to_merge threshold on the bin that contains it.
+ Result<std::vector<ManifestFile>> MergeGroup(
+ const std::vector<const ManifestFile*>& group, const ManifestFile* first,
+ int64_t snapshot_id, const TableMetadata& metadata,
std::shared_ptr<FileIO> file_io,
+ const ManifestWriterFactory& writer_factory);
+
+ /// \brief Write a merged manifest from all manifests in a bin.
+ ///
+ /// Entries are written snapshot-aware:
+ /// - ADDED from snapshot_id → WriteAddedEntry (preserve status)
+ /// - DELETED from snapshot_id → WriteDeletedEntry (preserve tombstone)
+ /// - DELETED from older snapshots → dropped (stale tombstones are not
carried forward)
+ /// - All other entries → WriteExistingEntry
+ Result<ManifestFile> FlushBin(const std::vector<const ManifestFile*>& bin,
+ int64_t snapshot_id, const TableMetadata&
metadata,
+ std::shared_ptr<FileIO> file_io,
+ const ManifestWriterFactory& writer_factory);
+
+ const int64_t target_size_bytes_;
+ const int32_t min_count_to_merge_;
+ const bool merge_enabled_;
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/manifest/manifest_writer.h
b/src/iceberg/manifest/manifest_writer.h
index cc57f25f..0eaf478d 100644
--- a/src/iceberg/manifest/manifest_writer.h
+++ b/src/iceberg/manifest/manifest_writer.h
@@ -22,6 +22,7 @@
/// \file iceberg/manifest/manifest_writer.h
/// Data writer interface for manifest files and manifest list files.
+#include <functional>
#include <memory>
#include <string>
#include <vector>
@@ -163,6 +164,10 @@ class ICEBERG_EXPORT ManifestWriter {
std::unique_ptr<PartitionSummary> partition_summary_;
};
+/// \brief Factory type for creating ManifestWriter instances.
+using ManifestWriterFactory =
std::function<Result<std::unique_ptr<ManifestWriter>>(
+ int32_t spec_id, ManifestContent content)>;
+
/// \brief Write manifest files to a manifest list file.
class ICEBERG_EXPORT ManifestListWriter {
public:
diff --git a/src/iceberg/manifest/meson.build b/src/iceberg/manifest/meson.build
index 41e685ff..d4b039a6 100644
--- a/src/iceberg/manifest/meson.build
+++ b/src/iceberg/manifest/meson.build
@@ -18,8 +18,10 @@
install_headers(
[
'manifest_entry.h',
+ 'manifest_filter_manager.h',
'manifest_group.h',
'manifest_list.h',
+ 'manifest_merge_manager.h',
'manifest_reader.h',
'manifest_writer.h',
'rolling_manifest_writer.h',
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 0b5f269d..03dc2447 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -67,8 +67,10 @@ iceberg_sources = files(
'location_provider.cc',
'manifest/manifest_adapter.cc',
'manifest/manifest_entry.cc',
+ 'manifest/manifest_filter_manager.cc',
'manifest/manifest_group.cc',
'manifest/manifest_list.cc',
+ 'manifest/manifest_merge_manager.cc',
'manifest/manifest_reader.cc',
'manifest/manifest_util.cc',
'manifest/manifest_writer.cc',
diff --git a/src/iceberg/schema.cc b/src/iceberg/schema.cc
index 00905378..5fdd4799 100644
--- a/src/iceberg/schema.cc
+++ b/src/iceberg/schema.cc
@@ -40,6 +40,8 @@ Schema::Schema(std::vector<SchemaField> fields, int32_t
schema_id)
schema_id_(schema_id),
cache_(std::make_unique<SchemaCache>(this)) {}
+Schema::~Schema() = default;
+
Result<std::unique_ptr<Schema>> Schema::Make(std::vector<SchemaField> fields,
int32_t schema_id,
std::vector<int32_t>
identifier_field_ids) {
diff --git a/src/iceberg/schema.h b/src/iceberg/schema.h
index 3c84bc2a..791ed5c8 100644
--- a/src/iceberg/schema.h
+++ b/src/iceberg/schema.h
@@ -57,6 +57,8 @@ class ICEBERG_EXPORT Schema : public StructType {
explicit Schema(std::vector<SchemaField> fields, int32_t schema_id =
kInitialSchemaId);
+ ~Schema() override;
+
/// \brief Create a schema.
///
/// \param fields The fields that make up the schema.
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index afafc4c1..791ad9be 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -180,6 +180,7 @@ if(ICEBERG_BUILD_BUNDLE)
delete_file_index_test.cc
manifest_group_test.cc
manifest_list_versions_test.cc
+ manifest_merge_manager_test.cc
manifest_reader_stats_test.cc
manifest_reader_test.cc
manifest_writer_versions_test.cc
@@ -205,6 +206,7 @@ if(ICEBERG_BUILD_BUNDLE)
SOURCES
expire_snapshots_test.cc
fast_append_test.cc
+ manifest_filter_manager_test.cc
name_mapping_update_test.cc
snapshot_manager_test.cc
transaction_test.cc
diff --git a/src/iceberg/test/manifest_filter_manager_test.cc
b/src/iceberg/test/manifest_filter_manager_test.cc
new file mode 100644
index 00000000..7810509f
--- /dev/null
+++ b/src/iceberg/test/manifest_filter_manager_test.cc
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_filter_manager.h"
+
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/expression/expression.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+class ManifestFilterManagerTest : public MinimalUpdateTestBase {
+ protected:
+ static void SetUpTestSuite() { avro::RegisterAll(); }
+
+ void SetUp() override {
+ MinimalUpdateTestBase::SetUp();
+
+ ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
+ ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
+
+ // Two files in different partitions (identity(x))
+ file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L);
+ file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L);
+ }
+
+ std::shared_ptr<DataFile> MakeDataFile(const std::string& path, int64_t
partition_x) {
+ auto f = std::make_shared<DataFile>();
+ f->content = DataFile::Content::kData;
+ f->file_path = table_location_ + path;
+ f->file_format = FileFormatType::kParquet;
+ f->partition =
PartitionValues(std::vector<Literal>{Literal::Long(partition_x)});
+ f->file_size_in_bytes = 1024;
+ f->record_count = 100;
+ f->partition_spec_id = spec_->spec_id();
+ return f;
+ }
+
+ // Append files, commit, refresh, and return the current snapshot.
+ Result<std::shared_ptr<Snapshot>> CommitFiles(
+ std::vector<std::shared_ptr<DataFile>> files) {
+ ICEBERG_ASSIGN_OR_RAISE(auto fa, table_->NewFastAppend());
+ for (const auto& f : files) fa->AppendFile(f);
+ ICEBERG_RETURN_UNEXPECTED(fa->Commit());
+ ICEBERG_RETURN_UNEXPECTED(table_->Refresh());
+ return table_->current_snapshot();
+ }
+
+ ManifestWriterFactory MakeWriterFactory(const TableMetadata& metadata) {
+ auto fv = metadata.format_version;
+ return [this, fv, &metadata](int32_t spec_id, ManifestContent content)
mutable
+ -> Result<std::unique_ptr<ManifestWriter>> {
+ ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata.PartitionSpecById(spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+ auto path =
+ std::format("{}/metadata/flt-{}.avro", table_location_,
manifest_counter_++);
+ return ManifestWriter::MakeWriter(fv, kTestSnapshotId, path, file_io_,
spec, schema,
+ content);
+ };
+ }
+
+ ManifestFilterManager::PartitionSpecsById SpecsById(const TableMetadata&
metadata) {
+ ManifestFilterManager::PartitionSpecsById specs_by_id;
+ for (const auto& spec : metadata.partition_specs) {
+ specs_by_id.emplace(spec->spec_id(), spec);
+ }
+ return specs_by_id;
+ }
+
+ // Read all entries from a list of ManifestFiles.
+ Result<std::vector<ManifestEntry>> ReadAllEntries(
+ const std::vector<ManifestFile>& manifests, const TableMetadata&
metadata) {
+ std::vector<ManifestEntry> result;
+ for (const auto& m : manifests) {
+ ICEBERG_ASSIGN_OR_RAISE(auto spec,
metadata.PartitionSpecById(m.partition_spec_id));
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(m, file_io_, schema, spec));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+ result.insert(result.end(), entries.begin(), entries.end());
+ }
+ return result;
+ }
+
+ static constexpr int64_t kTestSnapshotId = 55555L;
+ int manifest_counter_ = 0;
+ std::shared_ptr<PartitionSpec> spec_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<DataFile> file_a_;
+ std::shared_ptr<DataFile> file_b_;
+};
+
+TEST_F(ManifestFilterManagerTest, NullSnapshotReturnsEmpty) {
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, nullptr,
factory));
+ EXPECT_TRUE(result.empty());
+}
+
+TEST_F(ManifestFilterManagerTest, ContainsDeletesReturnsCorrectState) {
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ EXPECT_FALSE(mgr.ContainsDeletes());
+ mgr.DeleteFile("/some/path.parquet");
+ EXPECT_TRUE(mgr.ContainsDeletes());
+}
+
+TEST_F(ManifestFilterManagerTest, DeleteByRowFilterRejectsNull) {
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ EXPECT_THAT(mgr.DeleteByRowFilter(nullptr),
IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(ManifestFilterManagerTest, DeleteFileObjectRejectsNull) {
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ std::shared_ptr<DataFile> null_file;
+ EXPECT_THAT(mgr.DeleteFile(null_file), IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(ManifestFilterManagerTest, NoConditionsReturnsManifestsUnchanged) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ // Load original manifests so we can compare paths
+ ICEBERG_UNWRAP_OR_FAIL(auto list_reader,
+ ManifestListReader::Make(snap->manifest_list,
file_io_));
+ ICEBERG_UNWRAP_OR_FAIL(auto orig_manifests, list_reader->Files());
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ASSERT_EQ(result.size(), orig_manifests.size());
+ for (size_t i = 0; i < result.size(); ++i) {
+ // No rewrite → same manifest path
+ EXPECT_EQ(result[i].manifest_path, orig_manifests[i].manifest_path);
+ }
+}
+
+TEST_F(ManifestFilterManagerTest, DeleteFileByPath) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.DeleteFile(file_a_->file_path);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ int deleted_count = 0;
+ int live_count = 0;
+ for (const auto& e : entries) {
+ if (e.status == ManifestStatus::kDeleted) {
+ ++deleted_count;
+ ASSERT_NE(e.data_file, nullptr);
+ EXPECT_EQ(e.data_file->file_path, file_a_->file_path);
+ } else {
+ ++live_count;
+ }
+ }
+ EXPECT_EQ(deleted_count, 1);
+ EXPECT_EQ(live_count, 1);
+}
+
+TEST_F(ManifestFilterManagerTest, ExplicitContextFilterManifestsDeletesByPath)
{
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto list_reader,
+ ManifestListReader::Make(snap->manifest_list,
file_io_));
+ ICEBERG_UNWRAP_OR_FAIL(auto manifest_files, list_reader->Files());
+ std::vector<const ManifestFile*> manifests;
+ manifests.reserve(manifest_files.size());
+ for (const auto& manifest : manifest_files) {
+ manifests.push_back(&manifest);
+ }
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.DeleteFile(file_a_->file_path);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(schema_,
SpecsById(*metadata),
+ manifests, factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ int deleted_count = 0;
+ for (const auto& entry : entries) {
+ if (entry.status == ManifestStatus::kDeleted) {
+ ++deleted_count;
+ ASSERT_NE(entry.data_file, nullptr);
+ EXPECT_EQ(entry.data_file->file_path, file_a_->file_path);
+ }
+ }
+ EXPECT_EQ(deleted_count, 1);
+}
+
+TEST_F(ManifestFilterManagerTest, RowFilterAlwaysTrueDeletesAll) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysTrue()), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ for (const auto& e : entries) {
+ EXPECT_EQ(e.status, ManifestStatus::kDeleted) << "Expected all entries to
be DELETED";
+ }
+}
+
+TEST_F(ManifestFilterManagerTest, RowFilterAlwaysFalseDeletesNone) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysFalse()), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ for (const auto& e : entries) {
+ // AlwaysFalse means nothing can match → entries remain ADDED or EXISTING
+ EXPECT_NE(e.status, ManifestStatus::kDeleted) << "Expected no entries to
be DELETED";
+ }
+}
+
+TEST_F(ManifestFilterManagerTest, RowFilterUsesPartitionResiduals) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.CaseSensitive(false);
+ ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::Equal("X",
Literal::Long(1L))), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ int deleted_count = 0;
+ int live_count = 0;
+ for (const auto& e : entries) {
+ ASSERT_NE(e.data_file, nullptr);
+ if (e.status == ManifestStatus::kDeleted) {
+ ++deleted_count;
+ EXPECT_EQ(e.data_file->file_path, file_a_->file_path);
+ } else {
+ ++live_count;
+ EXPECT_EQ(e.data_file->file_path, file_b_->file_path);
+ }
+ }
+
+ EXPECT_EQ(deleted_count, 1);
+ EXPECT_EQ(live_count, 1);
+ ASSERT_EQ(mgr.FilesToBeDeleted().size(), 1U);
+ EXPECT_EQ(mgr.FilesToBeDeleted().begin()->get()->file_path,
file_a_->file_path);
+}
+
+TEST_F(ManifestFilterManagerTest, DropPartition) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ // Drop partition of file_a (partition_x = 1)
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.DropPartition(spec_->spec_id(),
+ PartitionValues(std::vector<Literal>{Literal::Long(1L)}));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ int deleted_count = 0;
+ for (const auto& e : entries) {
+ if (e.status == ManifestStatus::kDeleted) {
+ ++deleted_count;
+ ASSERT_TRUE(e.data_file != nullptr);
+ EXPECT_EQ(e.data_file->file_path, file_a_->file_path);
+ }
+ }
+ EXPECT_EQ(deleted_count, 1);
+}
+
+TEST_F(ManifestFilterManagerTest, FailMissingDeletePathsReturnsError) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.DeleteFile("/does/not/exist.parquet");
+ mgr.FailMissingDeletePaths();
+
+ auto result = mgr.FilterManifests(*metadata, snap, factory);
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+}
+
+TEST_F(ManifestFilterManagerTest, FailAnyDeleteReportsPartitionPath) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.DeleteFile(file_a_->file_path);
+ mgr.FailAnyDelete();
+
+ auto result = mgr.FilterManifests(*metadata, snap, factory);
+ EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
+ EXPECT_THAT(result, HasErrorMessage("x=1"));
+}
+
+TEST_F(ManifestFilterManagerTest, MultipleConditionsOrCombined) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_, file_b_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ // Both files should be deleted: file_a by path, file_b by AlwaysTrue
expression
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ mgr.DeleteFile(file_a_->file_path);
+ ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::AlwaysTrue()), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+ for (const auto& e : entries) {
+ EXPECT_EQ(e.status, ManifestStatus::kDeleted);
+ }
+}
+
+TEST_F(ManifestFilterManagerTest, MultipleRowFiltersUseCombinedExpression) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snap, CommitFiles({file_a_}));
+ auto* metadata = table_->metadata().get();
+ auto factory = MakeWriterFactory(*metadata);
+
+ ManifestFilterManager mgr(ManifestContent::kData, file_io_);
+ ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::Equal("y",
Literal::Long(7L))), IsOk());
+ ASSERT_THAT(mgr.DeleteByRowFilter(Expressions::Equal("x",
Literal::Long(1L))), IsOk());
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, mgr.FilterManifests(*metadata, snap,
factory));
+ ICEBERG_UNWRAP_OR_FAIL(auto entries, ReadAllEntries(result, *metadata));
+
+ ASSERT_EQ(entries.size(), 1U);
+ EXPECT_EQ(entries[0].status, ManifestStatus::kDeleted);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/test/manifest_merge_manager_test.cc
b/src/iceberg/test/manifest_merge_manager_test.cc
new file mode 100644
index 00000000..b19eace8
--- /dev/null
+++ b/src/iceberg/test/manifest_merge_manager_test.cc
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/manifest/manifest_merge_manager.h"
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_io_util.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/file_format.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/result.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/schema_field.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+namespace {
+
+constexpr int8_t kFormatVersion = 2;
+constexpr int64_t kSnapshotId = 12345L;
+constexpr int32_t kSpecId0 = 0;
+constexpr int32_t kSpecId1 = 1;
+
+} // namespace
+
+class ManifestMergeManagerTest : public ::testing::Test {
+ protected:
+ static void SetUpTestSuite() { avro::RegisterAll(); }
+
+ void SetUp() override {
+ file_io_ = arrow::MakeMockFileIO();
+
+ // Simple schema: one long column
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(1, "x", int64()),
+ });
+ spec0_ = PartitionSpec::Make(kSpecId0,
+ {PartitionField(1, 1000, "x",
Transform::Identity())})
+ .value();
+ spec1_ = PartitionSpec::Make(
+ kSpecId1, {PartitionField(1, 1001, "x_bucket",
Transform::Bucket(8))})
+ .value();
+
+ // Build minimal TableMetadata with both specs
+ auto builder = TableMetadataBuilder::BuildFromEmpty(kFormatVersion);
+ builder->SetCurrentSchema(schema_, schema_->HighestFieldId().value_or(0));
+ builder->SetDefaultPartitionSpec(spec0_);
+ builder->AddPartitionSpec(spec1_);
+ builder->SetDefaultSortOrder(SortOrder::Unsorted());
+ ICEBERG_UNWRAP_OR_FAIL(auto metadata, builder->Build());
+ metadata_ = std::shared_ptr<TableMetadata>(std::move(metadata));
+ }
+
+ // Write a small manifest with N data files and return the ManifestFile
descriptor.
+ Result<ManifestFile> WriteManifest(int32_t spec_id, int num_files,
+ int64_t file_size_override = 512,
+ ManifestContent content =
ManifestContent::kData) {
+ auto path = std::format("manifest-{}.avro", manifest_counter_++);
+ auto spec = spec_id == kSpecId0 ? spec0_ : spec1_;
+ ICEBERG_ASSIGN_OR_RAISE(auto writer,
+ ManifestWriter::MakeWriter(kFormatVersion,
kSnapshotId, path,
+ file_io_, spec,
schema_, content));
+ for (int i = 0; i < num_files; ++i) {
+ auto f = std::make_shared<DataFile>();
+ f->content = (content == ManifestContent::kDeletes)
+ ? DataFile::Content::kPositionDeletes
+ : DataFile::Content::kData;
+ f->file_path = std::format("data/file-{}-{}.parquet", manifest_counter_,
i);
+ f->file_format = FileFormatType::kParquet;
+ // Identity spec uses LONG partition values; Bucket spec uses INT
+ Literal part_val = (spec_id == kSpecId0) ? Literal::Long(i) :
Literal::Int(i % 8);
+ f->partition = PartitionValues(std::vector<Literal>{part_val});
+ f->file_size_in_bytes = 1024;
+ f->record_count = 10;
+ f->partition_spec_id = spec_id;
+ ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(f));
+ }
+ ICEBERG_RETURN_UNEXPECTED(writer->Close());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifest_file, writer->ToManifestFile());
+ // Override length so we can control bin-packing behaviour in tests
+ manifest_file.manifest_length = file_size_override;
+ return manifest_file;
+ }
+
+ ManifestWriterFactory MakeWriterFactory() {
+ return [this](int32_t spec_id,
+ ManifestContent content) ->
Result<std::unique_ptr<ManifestWriter>> {
+ ++factory_call_count_;
+ auto spec = spec_id == kSpecId0 ? spec0_ : spec1_;
+ auto path = std::format("merged-{}.avro", manifest_counter_++);
+ return ManifestWriter::MakeWriter(kFormatVersion, kSnapshotId, path,
file_io_, spec,
+ schema_, content);
+ };
+ }
+
+ // Count total entries across all manifests.
+ Result<int> CountEntries(const std::vector<ManifestFile>& manifests) {
+ int total = 0;
+ for (const auto& m : manifests) {
+ auto spec = m.partition_spec_id == kSpecId0 ? spec0_ : spec1_;
+ ICEBERG_ASSIGN_OR_RAISE(auto reader,
+ ManifestReader::Make(m, file_io_, schema_,
spec));
+ ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+ total += static_cast<int>(entries.size());
+ }
+ return total;
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> spec0_;
+ std::shared_ptr<PartitionSpec> spec1_;
+ std::shared_ptr<TableMetadata> metadata_;
+ int manifest_counter_ = 0;
+ int factory_call_count_ = 0;
+};
+
+TEST_F(ManifestMergeManagerTest, MergeDisabled) {
+ ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1));
+ ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1));
+ ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1));
+
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2,
/*enabled=*/false);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_,
file_io_,
+ MakeWriterFactory()));
+ // merge disabled → all 3 manifests returned, factory never called
+ EXPECT_EQ(result.size(), 3U);
+ EXPECT_EQ(factory_call_count_, 0);
+}
+
+TEST_F(ManifestMergeManagerTest, BelowMinCountThreshold) {
+ ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1));
+ ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1));
+
+ // min_count=3, only 2 manifests total → no merge
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto result,
+ mgr.MergeManifests({m0}, {m1}, kSnapshotId,
*metadata_, file_io_,
+ MakeWriterFactory()));
+ EXPECT_EQ(result.size(), 2U);
+ EXPECT_EQ(factory_call_count_, 0);
+}
+
+TEST_F(ManifestMergeManagerTest, MergeOccursAtThreshold) {
+ // 3 small manifests (each 100 bytes), target=1024 → all fit in one bin
+ ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100));
+
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result, mgr.MergeManifests({m0, m1}, {m2}, kSnapshotId, *metadata_,
file_io_,
+ MakeWriterFactory()));
+ // All 3 merged into 1 manifest (total 3 entries)
+ EXPECT_EQ(result.size(), 1U);
+ ICEBERG_UNWRAP_OR_FAIL(auto count1, CountEntries(result));
+ EXPECT_EQ(count1, 3);
+}
+
+TEST_F(ManifestMergeManagerTest, OversizedManifestPassedThrough) {
+ // m_large exceeds target → must not be merged; m_small fits
+ ICEBERG_UNWRAP_OR_FAIL(auto m_large, WriteManifest(kSpecId0, 2,
/*size=*/2000));
+ ICEBERG_UNWRAP_OR_FAIL(auto m_small, WriteManifest(kSpecId0, 1,
/*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m_small2, WriteManifest(kSpecId0, 1,
/*size=*/100));
+
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto result,
+ mgr.MergeManifests({m_large, m_small}, {m_small2},
kSnapshotId,
+ *metadata_, file_io_,
MakeWriterFactory()));
+ // m_large is oversized and acts as a bin boundary — the two small manifests
on either
+ // side of it are never merged together. m_small2 (the newest) is also
protected by
+ // minCountToMerge (size 1 < 2). All three remain separate.
+ EXPECT_EQ(result.size(), 3U);
+ ICEBERG_UNWRAP_OR_FAIL(auto count2, CountEntries(result));
+ EXPECT_EQ(count2, 4); // 2 + 1 + 1
+}
+
+TEST_F(ManifestMergeManagerTest, CrossSpecManifestsNotMerged) {
+ // Manifests with different spec IDs must never be merged together
+ ICEBERG_UNWRAP_OR_FAIL(auto m_spec0a, WriteManifest(kSpecId0, 1,
/*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m_spec0b, WriteManifest(kSpecId0, 1,
/*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m_spec1a, WriteManifest(kSpecId1, 1,
/*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m_spec1b, WriteManifest(kSpecId1, 1,
/*size=*/100));
+
+ // With 4 manifests (target large enough for each pair), we get 2 merged
outputs
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result,
+ mgr.MergeManifests({m_spec0a, m_spec1a}, {m_spec0b, m_spec1b},
kSnapshotId,
+ *metadata_, file_io_, MakeWriterFactory()));
+ EXPECT_EQ(result.size(), 2U);
+ // Verify spec IDs are preserved per output manifest
+ for (const auto& m : result) {
+ EXPECT_THAT(m.partition_spec_id, ::testing::AnyOf(kSpecId0, kSpecId1));
+ }
+}
+
+TEST_F(ManifestMergeManagerTest, WriterFactoryCalledOncePerMergedManifest) {
+ // 4 small manifests in two groups → 2 merged outputs → factory called twice
+ ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId1, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m3, WriteManifest(kSpecId1, 1, /*size=*/100));
+
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto result,
+ mgr.MergeManifests({m0, m2}, {m1, m3}, kSnapshotId,
*metadata_,
+ file_io_, MakeWriterFactory()));
+ EXPECT_EQ(result.size(), 2U);
+ EXPECT_EQ(factory_call_count_, 2);
+}
+
+TEST_F(ManifestMergeManagerTest, MixedContentManifestsNotMerged) {
+ // Data and delete manifests sharing the same spec_id must never be merged
together.
+ // The grouping key is (spec_id, content), so they land in separate bins.
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto d0, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kData));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto d1, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kData));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del0, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del1, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/2, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result, mgr.MergeManifests({d0, del0}, {d1, del1}, kSnapshotId,
*metadata_,
+ file_io_, MakeWriterFactory()));
+ // 2 data → 1 merged data manifest; 2 delete → 1 merged delete manifest
+ EXPECT_EQ(result.size(), 2U);
+ int data_count = 0;
+ int delete_count = 0;
+ for (const auto& m : result) {
+ if (m.content == ManifestContent::kData) ++data_count;
+ if (m.content == ManifestContent::kDeletes) ++delete_count;
+ }
+ EXPECT_EQ(data_count, 1);
+ EXPECT_EQ(delete_count, 1);
+}
+
+TEST_F(ManifestMergeManagerTest, MixedContentUsesFirstManifestPerContent) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto d0, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kData));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto d1, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kData));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del0, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del1, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+
+ // Each content type's newest manifest must be protected by the threshold
+ // independently.
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result, mgr.MergeManifests({d0, del0}, {d1, del1}, kSnapshotId,
*metadata_,
+ file_io_, MakeWriterFactory()));
+
+ // Each content type has exactly two manifests, below min_count=3, so
neither pair
+ // should be merged.
+ ASSERT_EQ(result.size(), 4U);
+ int data_count = 0;
+ int delete_count = 0;
+ for (const auto& manifest : result) {
+ if (manifest.content == ManifestContent::kData) {
+ ++data_count;
+ } else if (manifest.content == ManifestContent::kDeletes) {
+ ++delete_count;
+ }
+ }
+ EXPECT_EQ(data_count, 2);
+ EXPECT_EQ(delete_count, 2);
+}
+
+TEST_F(ManifestMergeManagerTest, DeleteManifestsMerged) {
+ // Delete manifests are bin-packed and merged just like data manifests.
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del0, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del1, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto del2, WriteManifest(kSpecId0, 1, /*size=*/100,
ManifestContent::kDeletes));
+
+ ManifestMergeManager mgr(/*target=*/1024, /*min_count=*/3, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto result,
+ mgr.MergeManifests({del0, del1}, {del2}, kSnapshotId,
*metadata_,
+ file_io_, MakeWriterFactory()));
+ EXPECT_EQ(result.size(), 1U);
+ EXPECT_EQ(result[0].content, ManifestContent::kDeletes);
+ ICEBERG_UNWRAP_OR_FAIL(auto count, CountEntries(result));
+ EXPECT_EQ(count, 3);
+}
+
+TEST_F(ManifestMergeManagerTest, PackEndOlderManifestsMergedNotNewest) {
+ // packEnd semantics: for [m0_new, m1_old, m2_old] with target=250 (pairs
fit but
+ // triples don't), packing from the end merges m1+m2 (the older pair) and
leaves
+ // m0 (the newest) in its own under-filled bin at the front of the output.
+ // This is the opposite of naive forward packing, which would merge m0+m1.
+ ICEBERG_UNWRAP_OR_FAIL(auto m1, WriteManifest(kSpecId0, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m2, WriteManifest(kSpecId0, 1, /*size=*/100));
+ ICEBERG_UNWRAP_OR_FAIL(auto m0, WriteManifest(kSpecId0, 1, /*size=*/100));
+
+ // target=250 fits two 100-byte manifests but not three.
+ // min_count=3 so m0's single-element bin is kept as-is (below threshold).
+ ManifestMergeManager mgr(/*target=*/250, /*min_count=*/3, /*enabled=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result, mgr.MergeManifests({m1, m2}, {m0}, kSnapshotId, *metadata_,
file_io_,
+ MakeWriterFactory()));
+ // Expected: [m0 (pass-through), merged(m1+m2)]
+ ASSERT_EQ(result.size(), 2U);
+ // First output is the newest manifest m0, passed through unchanged
(under-filled bin).
+ EXPECT_EQ(result[0].manifest_length, m0.manifest_length);
+ // Second output is the merged older pair — it must be a newly written
manifest
+ // (different path than either original).
+ EXPECT_NE(result[1].manifest_path, m1.manifest_path);
+ EXPECT_NE(result[1].manifest_path, m2.manifest_path);
+ ICEBERG_UNWRAP_OR_FAIL(auto count, CountEntries(result));
+ EXPECT_EQ(count, 3);
+}
+
+} // namespace iceberg