gty404 commented on code in PR #682:
URL: https://github.com/apache/iceberg-cpp/pull/682#discussion_r3322446739


##########
src/iceberg/manifest/manifest_filter_manager.h:
##########
@@ -116,9 +122,36 @@ class ICEBERG_EXPORT ManifestFilterManager {
   /// manifest entry matches a delete condition.
   void FailAnyDelete();
 
+  /// \brief Returns the number of manifests rewritten (replaced) by the last
+  /// FilterManifests() call. A manifest is replaced when it contained deleted 
entries
+  /// and was rewritten with those entries marked DELETED.
+  int32_t ReplacedManifestsCount() const { return replaced_manifests_count_; }
+
   /// \brief Returns true if any delete condition has been registered.
   bool ContainsDeletes() const;
 
+  /// \brief Set the minimum data sequence number for delete files to retain.
+  ///
+  /// Only valid for ManifestContent::kDeletes managers.  Delete entries whose
+  /// data_sequence_number is positive and less than \p sequence_number will be

Review Comment:
   Thanks, will update the doc comment to avoid \\p and check the nearby 
comments for the same style issue.



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <algorithm>
+#include <span>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/delete_file_index.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+namespace {
+
+bool MatchesOperation(std::optional<std::string_view> operation,
+                      std::initializer_list<std::string_view> expected) {
+  return operation.has_value() &&
+         std::ranges::find(expected, operation.value()) != expected.end();
+}
+
+struct ValidationHistoryResult {
+  std::vector<ManifestFile> manifests;
+  std::unordered_set<int64_t> snapshot_ids;
+};
+
+struct DeleteFileObjectKey {
+  std::string path;
+  std::optional<int64_t> content_offset;
+  std::optional<int64_t> content_size_in_bytes;
+
+  bool operator==(const DeleteFileObjectKey& other) const = default;
+};
+
+struct DeleteFileObjectKeyHash {
+  size_t operator()(const DeleteFileObjectKey& key) const {
+    size_t hash = std::hash<std::string>{}(key.path);
+    auto combine = [&hash](const auto& value) {
+      size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
+      hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
+    };
+    combine(key.content_offset);
+    combine(key.content_size_in_bytes);
+    return hash;
+  }
+};
+
+DeleteFileObjectKey MakeDeleteFileObjectKey(const DataFile& file) {
+  return DeleteFileObjectKey{.path = file.file_path,
+                             .content_offset = file.content_offset,
+                             .content_size_in_bytes = 
file.content_size_in_bytes};
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> ValidationAncestorsBetween(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      SnapshotUtil::AncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+  if (latest_snapshot_id == starting_snapshot_id) {
+    return ancestors;
+  }
+  if (ancestors.empty()) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+
+  const auto& oldest_checked = ancestors.back();
+  if (oldest_checked == nullptr || 
!oldest_checked->parent_snapshot_id.has_value() ||
+      oldest_checked->parent_snapshot_id.value() != starting_snapshot_id) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+  return ancestors;
+}
+
+Result<ValidationHistoryResult> ValidationHistory(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id,
+    std::initializer_list<std::string_view> matching_operations, 
ManifestContent content,
+    const std::shared_ptr<FileIO>& io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+
+  ValidationHistoryResult result;
+  for (const auto& snapshot : ancestors) {
+    if (!MatchesOperation(snapshot->Operation(), matching_operations)) {
+      continue;
+    }
+
+    result.snapshot_ids.insert(snapshot->snapshot_id);
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, content == ManifestContent::kData
+                                                ? cached.DataManifests(io)
+                                                : cached.DeleteManifests(io));
+    for (const auto& manifest : manifests) {
+      if (manifest.added_snapshot_id == snapshot->snapshot_id) {
+        result.manifests.push_back(manifest);
+      }
+    }
+  }
+
+  return result;
+}
+
+Result<std::optional<std::string>> FindMatchingDataFile(
+    const TableMetadata& metadata, const std::vector<ManifestFile>& manifests,
+    ManifestStatus status, std::shared_ptr<Expression> filter,
+    const PartitionSet* partition_set, const std::shared_ptr<FileIO>& io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  auto partition_filter = partition_set != nullptr
+                              ? std::make_shared<PartitionSet>(*partition_set)
+                              : std::shared_ptr<PartitionSet>{};
+
+  for (const auto& manifest : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                            
metadata.PartitionSpecById(manifest.partition_spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io, schema, spec));
+    reader->CaseSensitive(case_sensitive);
+    if (filter != nullptr) {
+      reader->FilterRows(filter);
+    }
+    if (partition_filter != nullptr) {
+      reader->FilterPartitions(partition_filter);
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      if (entry.status == status && entry.data_file != nullptr) {
+        return entry.data_file->file_path;
+      }
+    }
+  }
+
+  return std::optional<std::string>{};
+}
+
+}  // namespace
+
+MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name,
+                                             
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)),
+      table_name_(std::move(table_name)),
+      delete_expression_(Expressions::AlwaysFalse()),
+      data_filter_manager_(ManifestContent::kData, ctx_->table->io()),
+      delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()),
+      data_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)),
+      delete_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)) {}
+
+// -------------------------------------------------------------------------
+// Primitive API
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null data file");
+  }
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Data file must have a partition spec ID");
+  }
+
+  int32_t spec_id = file->partition_spec_id.value();
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+
+  // Suppress first_row_id — it will be assigned by the commit, not inherited 
from the
+  // source file.
+  file->first_row_id = std::nullopt;
+
+  auto& data_files = new_data_files_by_spec_[spec_id];
+  auto [it, inserted] = data_files.insert(file);
+  if (inserted) {
+    has_new_data_files_ = true;
+    ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, 
*file));
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) {
+  if (file.content == DataFile::Content::kData) {
+    return InvalidArgument("Expected a delete file but got a data file: {}",
+                           file.file_path);
+  }
+  const int8_t format_version = base().format_version;
+  const bool is_dv = ContentFileUtil::IsDV(file);
+  switch (format_version) {
+    case 1:
+      return InvalidArgument("Deletes are supported in V2 and above");
+    case 2:
+      // Position deletes must NOT be DVs in v2.
+      if (file.content == DataFile::Content::kPositionDeletes && is_dv) {
+        return InvalidArgument("Must not use DVs for position deletes in V2: 
{}",
+                               file.file_path);
+      }
+      break;
+    default:
+      if (format_version >= 3) {
+        // Position deletes MUST be DVs in v3+.
+        if (file.content == DataFile::Content::kPositionDeletes && !is_dv) {
+          return InvalidArgument("Must use DVs for position deletes in V{}: 
{}",
+                                 format_version, file.file_path);
+        }
+      } else {
+        return InvalidArgument("Unsupported format version: {}", 
format_version);
+      }
+      break;
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) {
+  return AddDeleteFile(std::move(file), std::nullopt);
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            int64_t data_sequence_number) {
+  return AddDeleteFile(std::move(file), 
std::optional<int64_t>(data_sequence_number));
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            std::optional<int64_t> 
data_sequence_number) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null delete file");
+  }
+  ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file));
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Delete file must have a partition spec ID");
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
base().PartitionSpecById(file->partition_spec_id.value()));
+  (void)spec;
+  has_new_delete_files_ = true;
+  new_delete_files_.push_back(PendingDeleteFile{
+      .file = std::move(file), .data_sequence_number = 
std::move(data_sequence_number)});
+  return {};
+}
+
+Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot delete a null data file");
+  }
+  return data_filter_manager_.DeleteFile(std::move(file));
+}
+
+Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) 
{
+  if (!file) {
+    return InvalidArgument("Cannot delete a null delete file");
+  }
+  return delete_filter_manager_.DeleteFile(std::move(file));
+}
+
+void MergingSnapshotUpdate::DeleteByPath(std::string_view path) {
+  data_filter_manager_.DeleteFile(path);
+}
+
+Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> 
expr) {
+  // If a delete file matches the row filter, it can also be removed because 
the rows
+  // it references will also be deleted. Both filter managers receive the 
expression.
+  delete_expression_ = expr;
+  ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr));
+  return delete_filter_manager_.DeleteByRowFilter(std::move(expr));
+}
+
+void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  // Dropping data in a partition also drops all delete files in that 
partition.
+  data_filter_manager_.DropPartition(spec_id, partition);
+  delete_filter_manager_.DropPartition(spec_id, std::move(partition));
+}
+
+void MergingSnapshotUpdate::FailMissingDeletePaths() {
+  data_filter_manager_.FailMissingDeletePaths();
+  delete_filter_manager_.FailMissingDeletePaths();
+}
+
+void MergingSnapshotUpdate::FailAnyDelete() {
+  data_filter_manager_.FailAnyDelete();
+  delete_filter_manager_.FailAnyDelete();
+}
+
+void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t 
sequence_number) {
+  new_data_files_data_seq_number_ = sequence_number;
+}
+
+void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  data_filter_manager_.CaseSensitive(case_sensitive);
+  delete_filter_manager_.CaseSensitive(case_sensitive);
+}
+
+void MergingSnapshotUpdate::Set(const std::string& property, const 
std::string& value) {
+  summary_builder().Set(property, value);
+}
+
+Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const 
{
+  if (new_data_files_by_spec_.empty()) {
+    return InvalidArgument("DataSpec() called before any data file was added");
+  }
+  if (new_data_files_by_spec_.size() > 1) {
+    return InvalidArgument(
+        "DataSpec() requires exactly one partition spec; got {} different 
specs",
+        new_data_files_by_spec_.size());
+  }
+  return base().PartitionSpecById(new_data_files_by_spec_.begin()->first);
+}
+
+std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() 
const {
+  std::vector<std::shared_ptr<DataFile>> result;
+  for (const auto& [spec_id, files] : new_data_files_by_spec_) {
+    for (const auto& file : files) {
+      result.push_back(file);
+    }
+  }
+  return result;
+}
+
+Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) {
+  if (manifest.content != ManifestContent::kData) {
+    return InvalidArgument("Cannot append delete manifest: {}", 
manifest.manifest_path);
+  }
+  if (can_inherit_snapshot_id() && manifest.added_snapshot_id == 
kInvalidSnapshotId) {
+    if (manifest.first_row_id.has_value()) {
+      return InvalidArgument("Cannot append manifest with assigned first row 
ID: {}",
+                             manifest.manifest_path);
+    }
+    appended_manifests_summary_.AddedManifest(manifest);
+    append_manifests_.push_back(std::move(manifest));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest));
+    rewritten_append_manifests_.push_back(std::move(copied));
+  }
+  return {};
+}
+
+Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& 
manifest) {
+  const TableMetadata& current = base();
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, SnapshotUtil::SchemaFor(current, 
target_branch()));
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
current.PartitionSpecById(manifest.partition_spec_id));
+  std::string path = ManifestPath();
+  all_written_manifests_.insert(path);
+  return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, 
SnapshotId(), path,
+                            current.format_version, 
&appended_manifests_summary_);
+}
+
+// -------------------------------------------------------------------------
+// State queries
+// -------------------------------------------------------------------------
+
+bool MergingSnapshotUpdate::AddsDataFiles() const {
+  return !new_data_files_by_spec_.empty();
+}
+
+bool MergingSnapshotUpdate::AddsDeleteFiles() const { return 
!new_delete_files_.empty(); }
+
+bool MergingSnapshotUpdate::DeletesDataFiles() const {
+  return data_filter_manager_.ContainsDeletes();
+}
+
+bool MergingSnapshotUpdate::DeletesDeleteFiles() const {
+  return delete_filter_manager_.ContainsDeletes();
+}
+
+// -------------------------------------------------------------------------
+// Apply pipeline
+// -------------------------------------------------------------------------
+
+ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory(
+    const std::shared_ptr<Schema>& schema) {
+  return
+      [this, schema](int32_t spec_id,
+                     ManifestContent content) -> 
Result<std::unique_ptr<ManifestWriter>> {
+        const TableMetadata& meta = base();
+        ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id));
+        std::string path = ManifestPath();
+        all_written_manifests_.insert(path);
+        return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(),
+                                          std::move(path), ctx_->table->io(),
+                                          std::move(spec), schema, content);
+      };
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDataManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_data_files_ && cached_new_data_manifests_.has_value()) {
+    for (const auto& m : *cached_new_data_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_data_manifests_.reset();
+  }
+
+  if (cached_new_data_manifests_.has_value()) {
+    return *cached_new_data_manifests_;
+  }
+
+  std::vector<ManifestFile> result;
+  for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto written,
+        WriteDataManifests(data_files.as_span(), spec, 
new_data_files_data_seq_number_));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_data_manifests_ = result;
+  has_new_data_files_ = false;
+  return result;
+}
+
+Result<std::vector<MergingSnapshotUpdate::PendingDeleteFile>>
+MergingSnapshotUpdate::NormalizeNewDeleteFiles() const {
+  std::vector<PendingDeleteFile> result;
+  result.reserve(new_delete_files_.size());
+
+  std::unordered_set<DeleteFileObjectKey, DeleteFileObjectKeyHash> 
seen_delete_files;
+  std::unordered_map<std::string, DeleteFileObjectKey> 
dv_by_referenced_data_file;
+
+  for (const auto& pending_file : new_delete_files_) {
+    const auto& file = pending_file.file;
+    ICEBERG_PRECHECK(file != nullptr, "Cannot add a null delete file");
+
+    auto key = MakeDeleteFileObjectKey(*file);
+    if (!seen_delete_files.insert(key).second) {
+      continue;
+    }
+
+    if (ContentFileUtil::IsDV(*file)) {
+      ICEBERG_PRECHECK(file->referenced_data_file.has_value(),
+                       "DV must have a referenced data file: {}", 
file->file_path);
+      auto [it, inserted] =
+          dv_by_referenced_data_file.emplace(*file->referenced_data_file, key);
+      if (!inserted && it->second != key) {

Review Comment:
   The current implementation fails closed because C++ does not yet have the 
Java DVUtil/Puffin write path needed to merge multiple staged DVs. I’ll narrow 
this PR to avoid claiming Java parity for staged DV merging.



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <algorithm>
+#include <span>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/delete_file_index.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+namespace {
+
+bool MatchesOperation(std::optional<std::string_view> operation,
+                      std::initializer_list<std::string_view> expected) {
+  return operation.has_value() &&
+         std::ranges::find(expected, operation.value()) != expected.end();
+}
+
+struct ValidationHistoryResult {
+  std::vector<ManifestFile> manifests;
+  std::unordered_set<int64_t> snapshot_ids;
+};
+
+struct DeleteFileObjectKey {
+  std::string path;
+  std::optional<int64_t> content_offset;
+  std::optional<int64_t> content_size_in_bytes;
+
+  bool operator==(const DeleteFileObjectKey& other) const = default;
+};
+
+struct DeleteFileObjectKeyHash {
+  size_t operator()(const DeleteFileObjectKey& key) const {
+    size_t hash = std::hash<std::string>{}(key.path);
+    auto combine = [&hash](const auto& value) {
+      size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
+      hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
+    };
+    combine(key.content_offset);
+    combine(key.content_size_in_bytes);
+    return hash;
+  }
+};
+
+DeleteFileObjectKey MakeDeleteFileObjectKey(const DataFile& file) {
+  return DeleteFileObjectKey{.path = file.file_path,
+                             .content_offset = file.content_offset,
+                             .content_size_in_bytes = 
file.content_size_in_bytes};
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> ValidationAncestorsBetween(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      SnapshotUtil::AncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+  if (latest_snapshot_id == starting_snapshot_id) {
+    return ancestors;
+  }
+  if (ancestors.empty()) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+
+  const auto& oldest_checked = ancestors.back();
+  if (oldest_checked == nullptr || 
!oldest_checked->parent_snapshot_id.has_value() ||
+      oldest_checked->parent_snapshot_id.value() != starting_snapshot_id) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+  return ancestors;
+}
+
+Result<ValidationHistoryResult> ValidationHistory(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id,
+    std::initializer_list<std::string_view> matching_operations, 
ManifestContent content,
+    const std::shared_ptr<FileIO>& io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+
+  ValidationHistoryResult result;
+  for (const auto& snapshot : ancestors) {
+    if (!MatchesOperation(snapshot->Operation(), matching_operations)) {
+      continue;
+    }
+
+    result.snapshot_ids.insert(snapshot->snapshot_id);
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, content == ManifestContent::kData
+                                                ? cached.DataManifests(io)
+                                                : cached.DeleteManifests(io));
+    for (const auto& manifest : manifests) {
+      if (manifest.added_snapshot_id == snapshot->snapshot_id) {
+        result.manifests.push_back(manifest);
+      }
+    }
+  }
+
+  return result;
+}
+
+Result<std::optional<std::string>> FindMatchingDataFile(
+    const TableMetadata& metadata, const std::vector<ManifestFile>& manifests,
+    ManifestStatus status, std::shared_ptr<Expression> filter,
+    const PartitionSet* partition_set, const std::shared_ptr<FileIO>& io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  auto partition_filter = partition_set != nullptr
+                              ? std::make_shared<PartitionSet>(*partition_set)
+                              : std::shared_ptr<PartitionSet>{};
+
+  for (const auto& manifest : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                            
metadata.PartitionSpecById(manifest.partition_spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io, schema, spec));
+    reader->CaseSensitive(case_sensitive);
+    if (filter != nullptr) {
+      reader->FilterRows(filter);
+    }
+    if (partition_filter != nullptr) {
+      reader->FilterPartitions(partition_filter);
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      if (entry.status == status && entry.data_file != nullptr) {
+        return entry.data_file->file_path;
+      }
+    }
+  }
+
+  return std::optional<std::string>{};
+}
+
+}  // namespace
+
+MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name,
+                                             
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)),
+      table_name_(std::move(table_name)),
+      delete_expression_(Expressions::AlwaysFalse()),
+      data_filter_manager_(ManifestContent::kData, ctx_->table->io()),
+      delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()),
+      data_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)),
+      delete_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)) {}
+
+// -------------------------------------------------------------------------
+// Primitive API
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null data file");
+  }
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Data file must have a partition spec ID");
+  }
+
+  int32_t spec_id = file->partition_spec_id.value();
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+
+  // Suppress first_row_id — it will be assigned by the commit, not inherited 
from the
+  // source file.
+  file->first_row_id = std::nullopt;
+
+  auto& data_files = new_data_files_by_spec_[spec_id];
+  auto [it, inserted] = data_files.insert(file);
+  if (inserted) {
+    has_new_data_files_ = true;
+    ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, 
*file));
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) {
+  if (file.content == DataFile::Content::kData) {
+    return InvalidArgument("Expected a delete file but got a data file: {}",
+                           file.file_path);
+  }
+  const int8_t format_version = base().format_version;
+  const bool is_dv = ContentFileUtil::IsDV(file);
+  switch (format_version) {
+    case 1:
+      return InvalidArgument("Deletes are supported in V2 and above");
+    case 2:
+      // Position deletes must NOT be DVs in v2.
+      if (file.content == DataFile::Content::kPositionDeletes && is_dv) {
+        return InvalidArgument("Must not use DVs for position deletes in V2: 
{}",
+                               file.file_path);
+      }
+      break;
+    default:
+      if (format_version >= 3) {
+        // Position deletes MUST be DVs in v3+.
+        if (file.content == DataFile::Content::kPositionDeletes && !is_dv) {
+          return InvalidArgument("Must use DVs for position deletes in V{}: 
{}",
+                                 format_version, file.file_path);
+        }
+      } else {
+        return InvalidArgument("Unsupported format version: {}", 
format_version);
+      }
+      break;
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) {
+  return AddDeleteFile(std::move(file), std::nullopt);
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            int64_t data_sequence_number) {
+  return AddDeleteFile(std::move(file), 
std::optional<int64_t>(data_sequence_number));
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            std::optional<int64_t> 
data_sequence_number) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null delete file");
+  }
+  ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file));
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Delete file must have a partition spec ID");
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
base().PartitionSpecById(file->partition_spec_id.value()));
+  (void)spec;
+  has_new_delete_files_ = true;
+  new_delete_files_.push_back(PendingDeleteFile{
+      .file = std::move(file), .data_sequence_number = 
std::move(data_sequence_number)});
+  return {};
+}
+
+Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot delete a null data file");
+  }
+  return data_filter_manager_.DeleteFile(std::move(file));
+}
+
+Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) 
{
+  if (!file) {
+    return InvalidArgument("Cannot delete a null delete file");
+  }
+  return delete_filter_manager_.DeleteFile(std::move(file));
+}
+
+void MergingSnapshotUpdate::DeleteByPath(std::string_view path) {
+  data_filter_manager_.DeleteFile(path);
+}
+
+Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> 
expr) {
+  // If a delete file matches the row filter, it can also be removed because 
the rows
+  // it references will also be deleted. Both filter managers receive the 
expression.
+  delete_expression_ = expr;
+  ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr));
+  return delete_filter_manager_.DeleteByRowFilter(std::move(expr));
+}
+
+void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  // Dropping data in a partition also drops all delete files in that 
partition.
+  data_filter_manager_.DropPartition(spec_id, partition);
+  delete_filter_manager_.DropPartition(spec_id, std::move(partition));
+}
+
+void MergingSnapshotUpdate::FailMissingDeletePaths() {
+  data_filter_manager_.FailMissingDeletePaths();
+  delete_filter_manager_.FailMissingDeletePaths();
+}
+
+void MergingSnapshotUpdate::FailAnyDelete() {
+  data_filter_manager_.FailAnyDelete();
+  delete_filter_manager_.FailAnyDelete();
+}
+
+void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t 
sequence_number) {
+  new_data_files_data_seq_number_ = sequence_number;
+}
+
+void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  data_filter_manager_.CaseSensitive(case_sensitive);
+  delete_filter_manager_.CaseSensitive(case_sensitive);
+}
+
+void MergingSnapshotUpdate::Set(const std::string& property, const 
std::string& value) {
+  summary_builder().Set(property, value);
+}
+
+Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const 
{
+  if (new_data_files_by_spec_.empty()) {
+    return InvalidArgument("DataSpec() called before any data file was added");
+  }
+  if (new_data_files_by_spec_.size() > 1) {
+    return InvalidArgument(
+        "DataSpec() requires exactly one partition spec; got {} different 
specs",
+        new_data_files_by_spec_.size());
+  }
+  return base().PartitionSpecById(new_data_files_by_spec_.begin()->first);
+}
+
+std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() 
const {
+  std::vector<std::shared_ptr<DataFile>> result;
+  for (const auto& [spec_id, files] : new_data_files_by_spec_) {
+    for (const auto& file : files) {
+      result.push_back(file);
+    }
+  }
+  return result;
+}
+
+Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) {
+  if (manifest.content != ManifestContent::kData) {
+    return InvalidArgument("Cannot append delete manifest: {}", 
manifest.manifest_path);
+  }
+  if (can_inherit_snapshot_id() && manifest.added_snapshot_id == 
kInvalidSnapshotId) {
+    if (manifest.first_row_id.has_value()) {
+      return InvalidArgument("Cannot append manifest with assigned first row 
ID: {}",
+                             manifest.manifest_path);
+    }
+    appended_manifests_summary_.AddedManifest(manifest);
+    append_manifests_.push_back(std::move(manifest));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest));
+    rewritten_append_manifests_.push_back(std::move(copied));
+  }
+  return {};
+}
+
+Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& 
manifest) {
+  const TableMetadata& current = base();
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, SnapshotUtil::SchemaFor(current, 
target_branch()));
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
current.PartitionSpecById(manifest.partition_spec_id));
+  std::string path = ManifestPath();
+  all_written_manifests_.insert(path);
+  return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, 
SnapshotId(), path,
+                            current.format_version, 
&appended_manifests_summary_);
+}
+
+// -------------------------------------------------------------------------
+// State queries
+// -------------------------------------------------------------------------
+
+bool MergingSnapshotUpdate::AddsDataFiles() const {
+  return !new_data_files_by_spec_.empty();
+}
+
+bool MergingSnapshotUpdate::AddsDeleteFiles() const { return 
!new_delete_files_.empty(); }
+
+bool MergingSnapshotUpdate::DeletesDataFiles() const {
+  return data_filter_manager_.ContainsDeletes();
+}
+
+bool MergingSnapshotUpdate::DeletesDeleteFiles() const {
+  return delete_filter_manager_.ContainsDeletes();
+}
+
+// -------------------------------------------------------------------------
+// Apply pipeline
+// -------------------------------------------------------------------------
+
+ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory(
+    const std::shared_ptr<Schema>& schema) {
+  return
+      [this, schema](int32_t spec_id,
+                     ManifestContent content) -> 
Result<std::unique_ptr<ManifestWriter>> {
+        const TableMetadata& meta = base();
+        ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id));
+        std::string path = ManifestPath();
+        all_written_manifests_.insert(path);
+        return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(),
+                                          std::move(path), ctx_->table->io(),
+                                          std::move(spec), schema, content);
+      };
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDataManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_data_files_ && cached_new_data_manifests_.has_value()) {
+    for (const auto& m : *cached_new_data_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_data_manifests_.reset();
+  }
+
+  if (cached_new_data_manifests_.has_value()) {
+    return *cached_new_data_manifests_;
+  }
+
+  std::vector<ManifestFile> result;
+  for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto written,
+        WriteDataManifests(data_files.as_span(), spec, 
new_data_files_data_seq_number_));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_data_manifests_ = result;
+  has_new_data_files_ = false;
+  return result;
+}
+
+Result<std::vector<MergingSnapshotUpdate::PendingDeleteFile>>
+MergingSnapshotUpdate::NormalizeNewDeleteFiles() const {
+  std::vector<PendingDeleteFile> result;
+  result.reserve(new_delete_files_.size());
+
+  std::unordered_set<DeleteFileObjectKey, DeleteFileObjectKeyHash> 
seen_delete_files;
+  std::unordered_map<std::string, DeleteFileObjectKey> 
dv_by_referenced_data_file;
+
+  for (const auto& pending_file : new_delete_files_) {
+    const auto& file = pending_file.file;
+    ICEBERG_PRECHECK(file != nullptr, "Cannot add a null delete file");
+
+    auto key = MakeDeleteFileObjectKey(*file);
+    if (!seen_delete_files.insert(key).second) {
+      continue;
+    }
+
+    if (ContentFileUtil::IsDV(*file)) {
+      ICEBERG_PRECHECK(file->referenced_data_file.has_value(),
+                       "DV must have a referenced data file: {}", 
file->file_path);
+      auto [it, inserted] =
+          dv_by_referenced_data_file.emplace(*file->referenced_data_file, key);
+      if (!inserted && it->second != key) {
+        return NotImplemented(
+            "Cannot merge multiple deletion vectors for referenced data file: 
{}",
+            *file->referenced_data_file);
+      }
+    }
+
+    result.push_back(pending_file);
+  }
+
+  return result;
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDeleteManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_delete_files_ && cached_new_delete_manifests_.has_value()) {
+    for (const auto& m : *cached_new_delete_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_delete_manifests_.reset();
+  }
+
+  if (cached_new_delete_manifests_.has_value()) {
+    return *cached_new_delete_manifests_;
+  }
+
+  // Group delete files by partition spec ID, mirroring 
WriteNewDataManifests().
+  std::unordered_map<int32_t, std::vector<PendingDeleteFile>> 
delete_files_by_spec;
+  for (const auto& pending_file : new_delete_files_) {
+    
delete_files_by_spec[pending_file.file->partition_spec_id.value()].push_back(
+        pending_file);
+  }
+
+  std::vector<ManifestFile> result;
+  for (auto& [spec_id, delete_files] : delete_files_by_spec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    std::vector<DeleteManifestEntry> delete_entries;
+    delete_entries.reserve(delete_files.size());
+    for (const auto& pending_file : delete_files) {
+      delete_entries.push_back(DeleteManifestEntry{
+          .file = pending_file.file,
+          .data_sequence_number = pending_file.data_sequence_number,
+      });
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto written, WriteDeleteManifests(delete_entries, 
spec));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_delete_manifests_ = result;
+  has_new_delete_files_ = false;
+  return result;
+}
+
+Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply(
+    const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& 
snapshot) {
+  // Re-validate buffered delete files against the current format version. A 
format
+  // upgrade between staging and commit could make previously-valid files 
invalid.
+  for (const auto& pending_file : new_delete_files_) {
+    ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*pending_file.file));
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto normalized_delete_files, 
NormalizeNewDeleteFiles());
+  new_delete_files_ = std::move(normalized_delete_files);
+
+  added_delete_files_summary_.Clear();
+  for (const auto& pending_file : new_delete_files_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata_to_update.PartitionSpecById(
+                                           
*pending_file.file->partition_spec_id));
+    ICEBERG_RETURN_UNEXPECTED(
+        added_delete_files_summary_.AddedFile(*spec, *pending_file.file));
+  }
+
+  // Rebuild summary from stable sub-builders so that commit retries don't 
double-count.
+  summary_builder().Clear();

Review Comment:
   Good catch. This does clear properties set through Set(). I’ll persist 
custom summary properties separately and reapply them after rebuilding the 
summary builder in Apply().



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <algorithm>
+#include <span>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/delete_file_index.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+namespace {
+
+bool MatchesOperation(std::optional<std::string_view> operation,
+                      std::initializer_list<std::string_view> expected) {
+  return operation.has_value() &&
+         std::ranges::find(expected, operation.value()) != expected.end();
+}
+
+struct ValidationHistoryResult {
+  std::vector<ManifestFile> manifests;
+  std::unordered_set<int64_t> snapshot_ids;
+};
+
+struct DeleteFileObjectKey {
+  std::string path;
+  std::optional<int64_t> content_offset;
+  std::optional<int64_t> content_size_in_bytes;
+
+  bool operator==(const DeleteFileObjectKey& other) const = default;
+};
+
+struct DeleteFileObjectKeyHash {
+  size_t operator()(const DeleteFileObjectKey& key) const {
+    size_t hash = std::hash<std::string>{}(key.path);
+    auto combine = [&hash](const auto& value) {
+      size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
+      hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
+    };
+    combine(key.content_offset);
+    combine(key.content_size_in_bytes);
+    return hash;
+  }
+};
+
+DeleteFileObjectKey MakeDeleteFileObjectKey(const DataFile& file) {
+  return DeleteFileObjectKey{.path = file.file_path,
+                             .content_offset = file.content_offset,
+                             .content_size_in_bytes = 
file.content_size_in_bytes};
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> ValidationAncestorsBetween(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      SnapshotUtil::AncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+  if (latest_snapshot_id == starting_snapshot_id) {
+    return ancestors;
+  }
+  if (ancestors.empty()) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+
+  const auto& oldest_checked = ancestors.back();
+  if (oldest_checked == nullptr || 
!oldest_checked->parent_snapshot_id.has_value() ||
+      oldest_checked->parent_snapshot_id.value() != starting_snapshot_id) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+  return ancestors;
+}
+
+Result<ValidationHistoryResult> ValidationHistory(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id,
+    std::initializer_list<std::string_view> matching_operations, 
ManifestContent content,
+    const std::shared_ptr<FileIO>& io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+
+  ValidationHistoryResult result;
+  for (const auto& snapshot : ancestors) {
+    if (!MatchesOperation(snapshot->Operation(), matching_operations)) {
+      continue;
+    }
+
+    result.snapshot_ids.insert(snapshot->snapshot_id);
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, content == ManifestContent::kData
+                                                ? cached.DataManifests(io)
+                                                : cached.DeleteManifests(io));
+    for (const auto& manifest : manifests) {
+      if (manifest.added_snapshot_id == snapshot->snapshot_id) {
+        result.manifests.push_back(manifest);
+      }
+    }
+  }
+
+  return result;
+}
+
+Result<std::optional<std::string>> FindMatchingDataFile(
+    const TableMetadata& metadata, const std::vector<ManifestFile>& manifests,
+    ManifestStatus status, std::shared_ptr<Expression> filter,
+    const PartitionSet* partition_set, const std::shared_ptr<FileIO>& io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  auto partition_filter = partition_set != nullptr
+                              ? std::make_shared<PartitionSet>(*partition_set)
+                              : std::shared_ptr<PartitionSet>{};
+
+  for (const auto& manifest : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                            
metadata.PartitionSpecById(manifest.partition_spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io, schema, spec));
+    reader->CaseSensitive(case_sensitive);
+    if (filter != nullptr) {
+      reader->FilterRows(filter);
+    }
+    if (partition_filter != nullptr) {
+      reader->FilterPartitions(partition_filter);
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      if (entry.status == status && entry.data_file != nullptr) {
+        return entry.data_file->file_path;
+      }
+    }
+  }
+
+  return std::optional<std::string>{};
+}
+
+}  // namespace
+
+MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name,
+                                             
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)),
+      table_name_(std::move(table_name)),
+      delete_expression_(Expressions::AlwaysFalse()),
+      data_filter_manager_(ManifestContent::kData, ctx_->table->io()),
+      delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()),
+      data_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)),
+      delete_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)) {}
+
+// -------------------------------------------------------------------------
+// Primitive API
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null data file");
+  }
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Data file must have a partition spec ID");
+  }
+
+  int32_t spec_id = file->partition_spec_id.value();
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+
+  // Suppress first_row_id — it will be assigned by the commit, not inherited 
from the
+  // source file.
+  file->first_row_id = std::nullopt;
+
+  auto& data_files = new_data_files_by_spec_[spec_id];
+  auto [it, inserted] = data_files.insert(file);
+  if (inserted) {
+    has_new_data_files_ = true;
+    ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, 
*file));
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) {
+  if (file.content == DataFile::Content::kData) {
+    return InvalidArgument("Expected a delete file but got a data file: {}",
+                           file.file_path);
+  }
+  const int8_t format_version = base().format_version;
+  const bool is_dv = ContentFileUtil::IsDV(file);
+  switch (format_version) {
+    case 1:
+      return InvalidArgument("Deletes are supported in V2 and above");
+    case 2:
+      // Position deletes must NOT be DVs in v2.
+      if (file.content == DataFile::Content::kPositionDeletes && is_dv) {
+        return InvalidArgument("Must not use DVs for position deletes in V2: 
{}",
+                               file.file_path);
+      }
+      break;
+    default:
+      if (format_version >= 3) {
+        // Position deletes MUST be DVs in v3+.
+        if (file.content == DataFile::Content::kPositionDeletes && !is_dv) {
+          return InvalidArgument("Must use DVs for position deletes in V{}: 
{}",
+                                 format_version, file.file_path);
+        }
+      } else {
+        return InvalidArgument("Unsupported format version: {}", 
format_version);
+      }
+      break;
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) {
+  return AddDeleteFile(std::move(file), std::nullopt);
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            int64_t data_sequence_number) {
+  return AddDeleteFile(std::move(file), 
std::optional<int64_t>(data_sequence_number));
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            std::optional<int64_t> 
data_sequence_number) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null delete file");
+  }
+  ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file));
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Delete file must have a partition spec ID");
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
base().PartitionSpecById(file->partition_spec_id.value()));
+  (void)spec;

Review Comment:
   Agreed. This is only validating that the spec exists, so I’ll replace the 
unused assignment with ICEBERG_RETURN_UNEXPECTED



##########
src/iceberg/test/merging_snapshot_update_test.cc:
##########
@@ -0,0 +1,1227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/constants.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/update_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+/// \brief Concrete subclass of MergingSnapshotUpdate for testing.
+class TestMergeAppend : public MergingSnapshotUpdate {
+ public:
+  static Result<std::unique_ptr<TestMergeAppend>> Make(std::string table_name,
+                                                       std::shared_ptr<Table> 
table) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto ctx, TransactionContext::Make(std::move(table), 
TransactionKind::kUpdate));
+    return std::unique_ptr<TestMergeAppend>(
+        new TestMergeAppend(std::move(table_name), std::move(ctx)));
+  }
+
+  std::string operation() override { return "append"; }
+
+  // Expose protected API for test access
+  Status AddFile(std::shared_ptr<DataFile> file) { return 
AddDataFile(std::move(file)); }
+  Status AddDelete(std::shared_ptr<DataFile> file) {
+    return AddDeleteFile(std::move(file));
+  }
+  Status AddDelete(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number) {
+    return AddDeleteFile(std::move(file), data_sequence_number);
+  }
+  Status RemoveDataFile(std::shared_ptr<DataFile> file) {
+    return DeleteDataFile(std::move(file));
+  }
+  Status RemoveDeleteFile(std::shared_ptr<DataFile> file) {
+    return DeleteDeleteFile(std::move(file));
+  }
+  Status AppendManifest(ManifestFile manifest) {
+    return AddManifest(std::move(manifest));
+  }
+  Result<std::shared_ptr<PartitionSpec>> DataSpec() const {
+    return MergingSnapshotUpdate::DataSpec();
+  }
+  int64_t GeneratedSnapshotId() { return SnapshotId(); }
+  void SetDataSeqNumber(int64_t seq) { SetNewDataFilesDataSequenceNumber(seq); 
}
+  static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
+                                              int64_t starting_snapshot_id,
+                                              const std::shared_ptr<Snapshot>& 
parent,
+                                              std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDataFiles(metadata, 
starting_snapshot_id,
+                                                         nullptr, parent, 
std::move(io));
+  }
+  static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
+                                              int64_t starting_snapshot_id,
+                                              const PartitionSet& 
partition_set,
+                                              const std::shared_ptr<Snapshot>& 
parent,
+                                              std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDataFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateNoNewDeletesForDataFilesForTest(
+      const TableMetadata& metadata, int64_t starting_snapshot_id,
+      std::shared_ptr<Expression> data_filter, const DataFileSet& 
replaced_files,
+      const std::shared_ptr<Snapshot>& parent, std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), 
replaced_files, parent,
+        std::move(io));
+  }
+  static Status ValidateNoNewDeleteFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                std::shared_ptr<Expression> 
data_filter,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), parent, 
std::move(io));
+  }
+  static Status ValidateNoNewDeleteFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                const PartitionSet& 
partition_set,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateDeletedDataFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                std::shared_ptr<Expression> 
data_filter,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateDeletedDataFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), parent, 
std::move(io));
+  }
+  static Status ValidateDeletedDataFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                const PartitionSet& 
partition_set,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateDeletedDataFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateAddedDVsForTest(const TableMetadata& metadata,
+                                        int64_t starting_snapshot_id,
+                                        std::shared_ptr<Expression> 
conflict_filter,
+                                        const std::shared_ptr<Snapshot>& 
parent,
+                                        std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDVs(metadata, 
starting_snapshot_id,
+                                                   std::move(conflict_filter), 
parent,
+                                                   std::move(io));
+  }
+
+  bool HasDataFiles() const { return AddsDataFiles(); }
+  bool HasDeleteFiles() const { return AddsDeleteFiles(); }
+  bool HasDataDeletes() const { return DeletesDataFiles(); }
+
+ private:
+  TestMergeAppend(std::string table_name, std::shared_ptr<TransactionContext> 
ctx)
+      : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {}
+};
+
+class TestOverwriteUpdate : public MergingSnapshotUpdate {
+ public:
+  static Result<std::unique_ptr<TestOverwriteUpdate>> Make(std::string 
table_name,
+                                                           
std::shared_ptr<Table> table) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto ctx, TransactionContext::Make(std::move(table), 
TransactionKind::kUpdate));
+    return std::unique_ptr<TestOverwriteUpdate>(
+        new TestOverwriteUpdate(std::move(table_name), std::move(ctx)));
+  }
+
+  std::string operation() override { return DataOperation::kOverwrite; }
+  int64_t GeneratedSnapshotId() { return SnapshotId(); }
+
+  Status AddDelete(std::shared_ptr<DataFile> file) {
+    return AddDeleteFile(std::move(file));
+  }
+  Status AddDelete(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number) {
+    return AddDeleteFile(std::move(file), data_sequence_number);
+  }
+  Status RemoveDataFile(std::shared_ptr<DataFile> file) {
+    return DeleteDataFile(std::move(file));
+  }
+
+ private:
+  TestOverwriteUpdate(std::string table_name, 
std::shared_ptr<TransactionContext> ctx)
+      : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {}
+};
+
+class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
+ protected:
+  static void SetUpTestSuite() { avro::RegisterAll(); }
+
+  void SetUp() override {
+    MinimalUpdateTestBase::SetUp();
+
+    ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
+    ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
+
+    file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L);
+    file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L);
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path, int64_t 
partition_x) {
+    auto f = std::make_shared<DataFile>();
+    f->content = DataFile::Content::kData;
+    f->file_path = table_location_ + path;
+    f->file_format = FileFormatType::kParquet;
+    f->partition = 
PartitionValues(std::vector<Literal>{Literal::Long(partition_x)});
+    f->file_size_in_bytes = 1024;
+    f->record_count = 100;
+    f->partition_spec_id = spec_->spec_id();
+    return f;
+  }
+
+  std::shared_ptr<DataFile> MakeDeleteFile(const std::string& path, int64_t 
partition_x) {
+    auto f = MakeDataFile(path, partition_x);
+    f->content = DataFile::Content::kPositionDeletes;
+    return f;
+  }
+
+  std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+                                                   int64_t partition_x) {
+    auto f = MakeDeleteFile(path, partition_x);
+    f->content = DataFile::Content::kEqualityDeletes;
+    f->equality_ids = {1};
+    return f;
+  }
+
+  Result<std::unique_ptr<TestMergeAppend>> NewMergeAppend() {
+    return TestMergeAppend::Make(TableName(), table_);
+  }
+
+  Result<std::unique_ptr<TestOverwriteUpdate>> NewOverwriteUpdate() {
+    return TestOverwriteUpdate::Make(TableName(), table_);
+  }
+
+  // Commit file_a_ with FastAppend and refresh the table.
+  void CommitFileA() {
+    ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend());
+    fa->AppendFile(file_a_);
+    EXPECT_THAT(fa->Commit(), IsOk());
+    EXPECT_THAT(table_->Refresh(), IsOk());
+  }
+
+  // Read all entries from a list of ManifestFiles.
+  Result<std::vector<ManifestEntry>> ReadAllEntries(
+      const std::vector<ManifestFile>& manifests, const TableMetadata& 
metadata) {
+    std::vector<ManifestEntry> result;
+    for (const auto& m : manifests) {
+      ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(m.partition_spec_id));
+      ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+      ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                              ManifestReader::Make(m, file_io_, schema, spec));
+      ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+      result.insert(result.end(), entries.begin(), entries.end());
+    }
+    return result;
+  }
+
+  // Write a manifest file containing the given data files.
+  // Returns a ManifestFile with added_snapshot_id = kInvalidSnapshotId so it
+  // is eligible for snapshot ID inheritance.
+  Result<ManifestFile> WriteManifest(
+      const std::string& path, const std::vector<std::shared_ptr<DataFile>>& 
files) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestWriter::MakeWriter(/*format_version=*/2, kInvalidSnapshotId, 
path,
+                                   file_io_, spec_, schema_, 
ManifestContent::kData));
+    for (const auto& f : files) {
+      ManifestEntry entry;
+      entry.status = ManifestStatus::kAdded;
+      entry.snapshot_id = std::nullopt;
+      entry.data_file = f;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->ToManifestFile();
+  }
+
+  Result<ManifestFile> WriteDeleteManifest(
+      const TableMetadata& metadata, const std::string& path,
+      const std::vector<std::shared_ptr<DataFile>>& files, int64_t 
sequence_number) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(spec_->spec_id()));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestWriter::MakeWriter(metadata.format_version, 
/*snapshot_id=*/1L, path,
+                                   file_io_, spec, schema, 
ManifestContent::kDeletes));
+    for (const auto& f : files) {
+      ManifestEntry entry;
+      entry.status = ManifestStatus::kAdded;
+      entry.snapshot_id = 1L;
+      entry.sequence_number = sequence_number;
+      entry.data_file = f;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->ToManifestFile();
+  }
+
+  Result<std::shared_ptr<Snapshot>> MakeSyntheticSnapshot(
+      std::string operation, int64_t snapshot_id,
+      std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+      const std::vector<ManifestFile>& manifests) {
+    auto manifest_list_path = table_location_ + "/metadata/manifest-list-" +
+                              std::to_string(snapshot_id) + ".avro";
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestListWriter::MakeWriter(table_->metadata()->format_version, 
snapshot_id,
+                                       parent_snapshot_id, manifest_list_path, 
file_io_,
+                                       sequence_number));
+    ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto snapshot,
+        Snapshot::Make(sequence_number, snapshot_id, parent_snapshot_id, 
TimePointMs{},
+                       std::move(operation), {}, 
table_->metadata()->current_schema_id,
+                       manifest_list_path));
+    return std::shared_ptr<Snapshot>(std::move(snapshot));
+  }
+
+  std::shared_ptr<PartitionSpec> spec_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<DataFile> file_a_;
+  std::shared_ptr<DataFile> file_b_;
+};
+
+// -------------------------------------------------------------------------
+// State query tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, AddsDataFilesInitiallyFalse) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_FALSE(op->HasDataFiles());
+  EXPECT_FALSE(op->HasDeleteFiles());
+  EXPECT_FALSE(op->HasDataDeletes());
+}
+
+TEST_F(MergingSnapshotUpdateTest, AddsDataFilesTrueAfterAdd) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_TRUE(op->HasDataFiles());
+  EXPECT_FALSE(op->HasDeleteFiles());
+}
+
+TEST_F(MergingSnapshotUpdateTest, AddsDeleteFilesTrueAfterAdd) {
+  auto del_file = MakeEqualityDeleteFile("/delete/del_a.parquet", 1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_FALSE(op->HasDataFiles());
+  EXPECT_TRUE(op->HasDeleteFiles());
+}
+
+TEST_F(MergingSnapshotUpdateTest, DeletesDataFilesTrueAfterRegisterDelete) {
+  CommitFileA();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->RemoveDataFile(file_a_), IsOk());
+  EXPECT_TRUE(op->HasDataDeletes());
+}
+
+// -------------------------------------------------------------------------
+// Apply / Commit tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, CommitNewDataFile) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at("added-data-files"), "1");

Review Comment:
   Makes sense. I’ll switch these assertions to SnapshotSummaryFields constants 
to keep the test consistent with the rest of the file



##########
src/iceberg/update/merging_snapshot_update.cc:
##########
@@ -0,0 +1,1090 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <algorithm>
+#include <span>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/constants.h"
+#include "iceberg/delete_file_index.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/expression/inclusive_metrics_evaluator.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_util_internal.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/content_file_util.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+
+namespace iceberg {
+
+namespace {
+
+bool MatchesOperation(std::optional<std::string_view> operation,
+                      std::initializer_list<std::string_view> expected) {
+  return operation.has_value() &&
+         std::ranges::find(expected, operation.value()) != expected.end();
+}
+
+struct ValidationHistoryResult {
+  std::vector<ManifestFile> manifests;
+  std::unordered_set<int64_t> snapshot_ids;
+};
+
+struct DeleteFileObjectKey {
+  std::string path;
+  std::optional<int64_t> content_offset;
+  std::optional<int64_t> content_size_in_bytes;
+
+  bool operator==(const DeleteFileObjectKey& other) const = default;
+};
+
+struct DeleteFileObjectKeyHash {
+  size_t operator()(const DeleteFileObjectKey& key) const {
+    size_t hash = std::hash<std::string>{}(key.path);
+    auto combine = [&hash](const auto& value) {
+      size_t value_hash = value.has_value() ? std::hash<int64_t>{}(*value) : 0;
+      hash ^= value_hash + 0x9e3779b9 + (hash << 6) + (hash >> 2);
+    };
+    combine(key.content_offset);
+    combine(key.content_size_in_bytes);
+    return hash;
+  }
+};
+
+DeleteFileObjectKey MakeDeleteFileObjectKey(const DataFile& file) {
+  return DeleteFileObjectKey{.path = file.file_path,
+                             .content_offset = file.content_offset,
+                             .content_size_in_bytes = 
file.content_size_in_bytes};
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> ValidationAncestorsBetween(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      SnapshotUtil::AncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+  if (latest_snapshot_id == starting_snapshot_id) {
+    return ancestors;
+  }
+  if (ancestors.empty()) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+
+  const auto& oldest_checked = ancestors.back();
+  if (oldest_checked == nullptr || 
!oldest_checked->parent_snapshot_id.has_value() ||
+      oldest_checked->parent_snapshot_id.value() != starting_snapshot_id) {
+    return InvalidArgument(
+        "Cannot validate history: starting snapshot {} is not an ancestor "
+        "of snapshot {}",
+        starting_snapshot_id, latest_snapshot_id);
+  }
+  return ancestors;
+}
+
+Result<ValidationHistoryResult> ValidationHistory(
+    const TableMetadata& metadata, int64_t latest_snapshot_id,
+    int64_t starting_snapshot_id,
+    std::initializer_list<std::string_view> matching_operations, 
ManifestContent content,
+    const std::shared_ptr<FileIO>& io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, latest_snapshot_id, 
starting_snapshot_id));
+
+  ValidationHistoryResult result;
+  for (const auto& snapshot : ancestors) {
+    if (!MatchesOperation(snapshot->Operation(), matching_operations)) {
+      continue;
+    }
+
+    result.snapshot_ids.insert(snapshot->snapshot_id);
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, content == ManifestContent::kData
+                                                ? cached.DataManifests(io)
+                                                : cached.DeleteManifests(io));
+    for (const auto& manifest : manifests) {
+      if (manifest.added_snapshot_id == snapshot->snapshot_id) {
+        result.manifests.push_back(manifest);
+      }
+    }
+  }
+
+  return result;
+}
+
+Result<std::optional<std::string>> FindMatchingDataFile(
+    const TableMetadata& metadata, const std::vector<ManifestFile>& manifests,
+    ManifestStatus status, std::shared_ptr<Expression> filter,
+    const PartitionSet* partition_set, const std::shared_ptr<FileIO>& io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  auto partition_filter = partition_set != nullptr
+                              ? std::make_shared<PartitionSet>(*partition_set)
+                              : std::shared_ptr<PartitionSet>{};
+
+  for (const auto& manifest : manifests) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                            
metadata.PartitionSpecById(manifest.partition_spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                            ManifestReader::Make(manifest, io, schema, spec));
+    reader->CaseSensitive(case_sensitive);
+    if (filter != nullptr) {
+      reader->FilterRows(filter);
+    }
+    if (partition_filter != nullptr) {
+      reader->FilterPartitions(partition_filter);
+    }
+
+    ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+    for (const auto& entry : entries) {
+      if (entry.status == status && entry.data_file != nullptr) {
+        return entry.data_file->file_path;
+      }
+    }
+  }
+
+  return std::optional<std::string>{};
+}
+
+}  // namespace
+
+MergingSnapshotUpdate::MergingSnapshotUpdate(std::string table_name,
+                                             
std::shared_ptr<TransactionContext> ctx)
+    : SnapshotUpdate(std::move(ctx)),
+      table_name_(std::move(table_name)),
+      delete_expression_(Expressions::AlwaysFalse()),
+      data_filter_manager_(ManifestContent::kData, ctx_->table->io()),
+      delete_filter_manager_(ManifestContent::kDeletes, ctx_->table->io()),
+      data_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)),
+      delete_merge_manager_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes),
+          base().properties.Get(TableProperties::kManifestMinMergeCount),
+          base().properties.Get(TableProperties::kManifestMergeEnabled)) {}
+
+// -------------------------------------------------------------------------
+// Primitive API
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::AddDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null data file");
+  }
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Data file must have a partition spec ID");
+  }
+
+  int32_t spec_id = file->partition_spec_id.value();
+  ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+
+  // Suppress first_row_id — it will be assigned by the commit, not inherited 
from the
+  // source file.
+  file->first_row_id = std::nullopt;
+
+  auto& data_files = new_data_files_by_spec_[spec_id];
+  auto [it, inserted] = data_files.insert(file);
+  if (inserted) {
+    has_new_data_files_ = true;
+    ICEBERG_RETURN_UNEXPECTED(added_data_files_summary_.AddedFile(*spec, 
*file));
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNewDeleteFile(const DataFile& file) {
+  if (file.content == DataFile::Content::kData) {
+    return InvalidArgument("Expected a delete file but got a data file: {}",
+                           file.file_path);
+  }
+  const int8_t format_version = base().format_version;
+  const bool is_dv = ContentFileUtil::IsDV(file);
+  switch (format_version) {
+    case 1:
+      return InvalidArgument("Deletes are supported in V2 and above");
+    case 2:
+      // Position deletes must NOT be DVs in v2.
+      if (file.content == DataFile::Content::kPositionDeletes && is_dv) {
+        return InvalidArgument("Must not use DVs for position deletes in V2: 
{}",
+                               file.file_path);
+      }
+      break;
+    default:
+      if (format_version >= 3) {
+        // Position deletes MUST be DVs in v3+.
+        if (file.content == DataFile::Content::kPositionDeletes && !is_dv) {
+          return InvalidArgument("Must use DVs for position deletes in V{}: 
{}",
+                                 format_version, file.file_path);
+        }
+      } else {
+        return InvalidArgument("Unsupported format version: {}", 
format_version);
+      }
+      break;
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file) {
+  return AddDeleteFile(std::move(file), std::nullopt);
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            int64_t data_sequence_number) {
+  return AddDeleteFile(std::move(file), 
std::optional<int64_t>(data_sequence_number));
+}
+
+Status MergingSnapshotUpdate::AddDeleteFile(std::shared_ptr<DataFile> file,
+                                            std::optional<int64_t> 
data_sequence_number) {
+  if (!file) {
+    return InvalidArgument("Cannot add a null delete file");
+  }
+  ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*file));
+  if (!file->partition_spec_id.has_value()) {
+    return InvalidArgument("Delete file must have a partition spec ID");
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
base().PartitionSpecById(file->partition_spec_id.value()));
+  (void)spec;
+  has_new_delete_files_ = true;
+  new_delete_files_.push_back(PendingDeleteFile{
+      .file = std::move(file), .data_sequence_number = 
std::move(data_sequence_number)});
+  return {};
+}
+
+Status MergingSnapshotUpdate::DeleteDataFile(std::shared_ptr<DataFile> file) {
+  if (!file) {
+    return InvalidArgument("Cannot delete a null data file");
+  }
+  return data_filter_manager_.DeleteFile(std::move(file));
+}
+
+Status MergingSnapshotUpdate::DeleteDeleteFile(std::shared_ptr<DataFile> file) 
{
+  if (!file) {
+    return InvalidArgument("Cannot delete a null delete file");
+  }
+  return delete_filter_manager_.DeleteFile(std::move(file));
+}
+
+void MergingSnapshotUpdate::DeleteByPath(std::string_view path) {
+  data_filter_manager_.DeleteFile(path);
+}
+
+Status MergingSnapshotUpdate::DeleteByRowFilter(std::shared_ptr<Expression> 
expr) {
+  // If a delete file matches the row filter, it can also be removed because 
the rows
+  // it references will also be deleted. Both filter managers receive the 
expression.
+  delete_expression_ = expr;
+  ICEBERG_RETURN_UNEXPECTED(data_filter_manager_.DeleteByRowFilter(expr));
+  return delete_filter_manager_.DeleteByRowFilter(std::move(expr));
+}
+
+void MergingSnapshotUpdate::DropPartition(int32_t spec_id, PartitionValues 
partition) {
+  // Dropping data in a partition also drops all delete files in that 
partition.
+  data_filter_manager_.DropPartition(spec_id, partition);
+  delete_filter_manager_.DropPartition(spec_id, std::move(partition));
+}
+
+void MergingSnapshotUpdate::FailMissingDeletePaths() {
+  data_filter_manager_.FailMissingDeletePaths();
+  delete_filter_manager_.FailMissingDeletePaths();
+}
+
+void MergingSnapshotUpdate::FailAnyDelete() {
+  data_filter_manager_.FailAnyDelete();
+  delete_filter_manager_.FailAnyDelete();
+}
+
+void MergingSnapshotUpdate::SetNewDataFilesDataSequenceNumber(int64_t 
sequence_number) {
+  new_data_files_data_seq_number_ = sequence_number;
+}
+
+void MergingSnapshotUpdate::CaseSensitive(bool case_sensitive) {
+  case_sensitive_ = case_sensitive;
+  data_filter_manager_.CaseSensitive(case_sensitive);
+  delete_filter_manager_.CaseSensitive(case_sensitive);
+}
+
+void MergingSnapshotUpdate::Set(const std::string& property, const 
std::string& value) {
+  summary_builder().Set(property, value);
+}
+
+Result<std::shared_ptr<PartitionSpec>> MergingSnapshotUpdate::DataSpec() const 
{
+  if (new_data_files_by_spec_.empty()) {
+    return InvalidArgument("DataSpec() called before any data file was added");
+  }
+  if (new_data_files_by_spec_.size() > 1) {
+    return InvalidArgument(
+        "DataSpec() requires exactly one partition spec; got {} different 
specs",
+        new_data_files_by_spec_.size());
+  }
+  return base().PartitionSpecById(new_data_files_by_spec_.begin()->first);
+}
+
+std::vector<std::shared_ptr<DataFile>> MergingSnapshotUpdate::AddedDataFiles() 
const {
+  std::vector<std::shared_ptr<DataFile>> result;
+  for (const auto& [spec_id, files] : new_data_files_by_spec_) {
+    for (const auto& file : files) {
+      result.push_back(file);
+    }
+  }
+  return result;
+}
+
+Status MergingSnapshotUpdate::AddManifest(ManifestFile manifest) {
+  if (manifest.content != ManifestContent::kData) {
+    return InvalidArgument("Cannot append delete manifest: {}", 
manifest.manifest_path);
+  }
+  if (can_inherit_snapshot_id() && manifest.added_snapshot_id == 
kInvalidSnapshotId) {
+    if (manifest.first_row_id.has_value()) {
+      return InvalidArgument("Cannot append manifest with assigned first row 
ID: {}",
+                             manifest.manifest_path);
+    }
+    appended_manifests_summary_.AddedManifest(manifest);
+    append_manifests_.push_back(std::move(manifest));
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(auto copied, CopyManifest(manifest));
+    rewritten_append_manifests_.push_back(std::move(copied));
+  }
+  return {};
+}
+
+Result<ManifestFile> MergingSnapshotUpdate::CopyManifest(const ManifestFile& 
manifest) {
+  const TableMetadata& current = base();
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, SnapshotUtil::SchemaFor(current, 
target_branch()));
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
current.PartitionSpecById(manifest.partition_spec_id));
+  std::string path = ManifestPath();
+  all_written_manifests_.insert(path);
+  return CopyAppendManifest(manifest, ctx_->table->io(), schema, spec, 
SnapshotId(), path,
+                            current.format_version, 
&appended_manifests_summary_);
+}
+
+// -------------------------------------------------------------------------
+// State queries
+// -------------------------------------------------------------------------
+
+bool MergingSnapshotUpdate::AddsDataFiles() const {
+  return !new_data_files_by_spec_.empty();
+}
+
+bool MergingSnapshotUpdate::AddsDeleteFiles() const { return 
!new_delete_files_.empty(); }
+
+bool MergingSnapshotUpdate::DeletesDataFiles() const {
+  return data_filter_manager_.ContainsDeletes();
+}
+
+bool MergingSnapshotUpdate::DeletesDeleteFiles() const {
+  return delete_filter_manager_.ContainsDeletes();
+}
+
+// -------------------------------------------------------------------------
+// Apply pipeline
+// -------------------------------------------------------------------------
+
+ManifestWriterFactory MergingSnapshotUpdate::MakeTrackedWriterFactory(
+    const std::shared_ptr<Schema>& schema) {
+  return
+      [this, schema](int32_t spec_id,
+                     ManifestContent content) -> 
Result<std::unique_ptr<ManifestWriter>> {
+        const TableMetadata& meta = base();
+        ICEBERG_ASSIGN_OR_RAISE(auto spec, meta.PartitionSpecById(spec_id));
+        std::string path = ManifestPath();
+        all_written_manifests_.insert(path);
+        return ManifestWriter::MakeWriter(meta.format_version, SnapshotId(),
+                                          std::move(path), ctx_->table->io(),
+                                          std::move(spec), schema, content);
+      };
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDataManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_data_files_ && cached_new_data_manifests_.has_value()) {
+    for (const auto& m : *cached_new_data_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_data_manifests_.reset();
+  }
+
+  if (cached_new_data_manifests_.has_value()) {
+    return *cached_new_data_manifests_;
+  }
+
+  std::vector<ManifestFile> result;
+  for (const auto& [spec_id, data_files] : new_data_files_by_spec_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto written,
+        WriteDataManifests(data_files.as_span(), spec, 
new_data_files_data_seq_number_));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_data_manifests_ = result;
+  has_new_data_files_ = false;
+  return result;
+}
+
+Result<std::vector<MergingSnapshotUpdate::PendingDeleteFile>>
+MergingSnapshotUpdate::NormalizeNewDeleteFiles() const {
+  std::vector<PendingDeleteFile> result;
+  result.reserve(new_delete_files_.size());
+
+  std::unordered_set<DeleteFileObjectKey, DeleteFileObjectKeyHash> 
seen_delete_files;
+  std::unordered_map<std::string, DeleteFileObjectKey> 
dv_by_referenced_data_file;
+
+  for (const auto& pending_file : new_delete_files_) {
+    const auto& file = pending_file.file;
+    ICEBERG_PRECHECK(file != nullptr, "Cannot add a null delete file");
+
+    auto key = MakeDeleteFileObjectKey(*file);
+    if (!seen_delete_files.insert(key).second) {
+      continue;
+    }
+
+    if (ContentFileUtil::IsDV(*file)) {
+      ICEBERG_PRECHECK(file->referenced_data_file.has_value(),
+                       "DV must have a referenced data file: {}", 
file->file_path);
+      auto [it, inserted] =
+          dv_by_referenced_data_file.emplace(*file->referenced_data_file, key);
+      if (!inserted && it->second != key) {
+        return NotImplemented(
+            "Cannot merge multiple deletion vectors for referenced data file: 
{}",
+            *file->referenced_data_file);
+      }
+    }
+
+    result.push_back(pending_file);
+  }
+
+  return result;
+}
+
+Result<std::vector<ManifestFile>> 
MergingSnapshotUpdate::WriteNewDeleteManifests() {
+  // If new files were staged after the cache was populated (commit retry), 
invalidate.
+  if (has_new_delete_files_ && cached_new_delete_manifests_.has_value()) {
+    for (const auto& m : *cached_new_delete_manifests_) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+    cached_new_delete_manifests_.reset();
+  }
+
+  if (cached_new_delete_manifests_.has_value()) {
+    return *cached_new_delete_manifests_;
+  }
+
+  // Group delete files by partition spec ID, mirroring 
WriteNewDataManifests().
+  std::unordered_map<int32_t, std::vector<PendingDeleteFile>> 
delete_files_by_spec;
+  for (const auto& pending_file : new_delete_files_) {
+    
delete_files_by_spec[pending_file.file->partition_spec_id.value()].push_back(
+        pending_file);
+  }
+
+  std::vector<ManifestFile> result;
+  for (auto& [spec_id, delete_files] : delete_files_by_spec) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, base().PartitionSpecById(spec_id));
+    std::vector<DeleteManifestEntry> delete_entries;
+    delete_entries.reserve(delete_files.size());
+    for (const auto& pending_file : delete_files) {
+      delete_entries.push_back(DeleteManifestEntry{
+          .file = pending_file.file,
+          .data_sequence_number = pending_file.data_sequence_number,
+      });
+    }
+    ICEBERG_ASSIGN_OR_RAISE(auto written, WriteDeleteManifests(delete_entries, 
spec));
+    for (const auto& m : written) {
+      all_written_manifests_.insert(m.manifest_path);
+    }
+    result.insert(result.end(), std::make_move_iterator(written.begin()),
+                  std::make_move_iterator(written.end()));
+  }
+
+  cached_new_delete_manifests_ = result;
+  has_new_delete_files_ = false;
+  return result;
+}
+
+Result<std::vector<ManifestFile>> MergingSnapshotUpdate::Apply(
+    const TableMetadata& metadata_to_update, const std::shared_ptr<Snapshot>& 
snapshot) {
+  // Re-validate buffered delete files against the current format version. A 
format
+  // upgrade between staging and commit could make previously-valid files 
invalid.
+  for (const auto& pending_file : new_delete_files_) {
+    ICEBERG_RETURN_UNEXPECTED(ValidateNewDeleteFile(*pending_file.file));
+  }
+  ICEBERG_ASSIGN_OR_RAISE(auto normalized_delete_files, 
NormalizeNewDeleteFiles());
+  new_delete_files_ = std::move(normalized_delete_files);
+
+  added_delete_files_summary_.Clear();
+  for (const auto& pending_file : new_delete_files_) {
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, metadata_to_update.PartitionSpecById(
+                                           
*pending_file.file->partition_spec_id));
+    ICEBERG_RETURN_UNEXPECTED(
+        added_delete_files_summary_.AddedFile(*spec, *pending_file.file));
+  }
+
+  // Rebuild summary from stable sub-builders so that commit retries don't 
double-count.
+  summary_builder().Clear();
+  summary_builder().Merge(added_data_files_summary_);
+  summary_builder().Merge(added_delete_files_summary_);
+  summary_builder().Merge(appended_manifests_summary_);
+
+  ICEBERG_ASSIGN_OR_RAISE(auto target_schema,
+                          SnapshotUtil::SchemaFor(metadata_to_update, 
target_branch()));
+  auto tracked_factory = MakeTrackedWriterFactory(target_schema);
+
+  // Step 1: Filter data manifests.
+  ICEBERG_ASSIGN_OR_RAISE(auto filtered_data, 
data_filter_manager_.FilterManifests(
+                                                  target_schema, 
metadata_to_update,
+                                                  snapshot, tracked_factory));
+
+  // Track deleted data files in the summary builder.
+  for (const auto& file : data_filter_manager_.DeletedFiles()) {
+    if (!file->partition_spec_id.has_value()) {
+      continue;
+    }
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto spec, 
metadata_to_update.PartitionSpecById(*file->partition_spec_id));
+    ICEBERG_RETURN_UNEXPECTED(summary_builder().DeletedFile(*spec, *file));
+  }
+  summary_builder().IncrementDuplicateDeletes(
+      data_filter_manager_.DuplicateDeletesCount());
+
+  // Step 2: Compute min data sequence number; set up delete filter cleanup.
+  // Use last_sequence_number as the initial value so that an empty filtered 
list
+  // produces a sensible minimum. Skip manifests with 
kUnassignedSequenceNumber —
+  // those are manifests written in the current Apply() call whose sequence 
number
+  // hasn't been assigned yet. If all filtered manifests are unassigned (e.g. 
the
+  // table has no pre-existing data manifests), the fallback to 
last_sequence_number
+  // is safe: any delete file with seq > 0 and seq <= last_sequence_number can 
no
+  // longer match live data rows, so cleaning them up is correct.
+  int64_t min_data_seq = metadata_to_update.last_sequence_number;
+  for (const auto& manifest : filtered_data) {
+    if (manifest.min_sequence_number != kUnassignedSequenceNumber) {
+      min_data_seq = std::min(min_data_seq, manifest.min_sequence_number);
+    }
+  }
+  delete_filter_manager_.DropDeleteFilesOlderThan(min_data_seq);
+  delete_filter_manager_.RemoveDanglingDeletesFor(
+      data_filter_manager_.FilesToBeDeleted());
+
+  // Step 3: Filter delete manifests.
+  ICEBERG_ASSIGN_OR_RAISE(auto filtered_deletes, 
delete_filter_manager_.FilterManifests(
+                                                     target_schema, 
metadata_to_update,
+                                                     snapshot, 
tracked_factory));
+
+  // Track deleted delete files in the summary builder.
+  for (const auto& file : delete_filter_manager_.DeletedFiles()) {
+    if (!file->partition_spec_id.has_value()) {
+      continue;
+    }
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto spec, 
metadata_to_update.PartitionSpecById(*file->partition_spec_id));
+    ICEBERG_RETURN_UNEXPECTED(summary_builder().DeletedFile(*spec, *file));
+  }
+  summary_builder().IncrementDuplicateDeletes(
+      delete_filter_manager_.DuplicateDeletesCount());
+
+  // Drop manifests with no live files — they carry no data and should not be 
merged
+  // into the new snapshot. Manifests written by the current snapshot are 
always kept
+  // regardless of live-file counts; the merge stage handles any that are 
empty.
+  int64_t snapshot_id = SnapshotId();
+  auto should_keep = [snapshot_id](const ManifestFile& m) {
+    return m.has_added_files() || m.has_existing_files() ||
+           m.added_snapshot_id == snapshot_id;
+  };
+  std::erase_if(filtered_data, [&](const ManifestFile& m) { return 
!should_keep(m); });
+  std::erase_if(filtered_deletes, [&](const ManifestFile& m) { return 
!should_keep(m); });
+
+  // Step 4: Write (or retrieve cached) new data manifests.
+  ICEBERG_ASSIGN_OR_RAISE(auto written_data_manifests, 
WriteNewDataManifests());
+
+  // Incorporate append manifests (from AddManifest), stamping each with the
+  // current snapshot ID. append_manifests_ are used directly (inherit path);
+  // rewritten_append_manifests_ were already copied with the snapshot ID.
+  std::vector<ManifestFile> new_data_manifests = 
std::move(written_data_manifests);
+  for (const auto& src : append_manifests_) {
+    ManifestFile m = src;
+    m.added_snapshot_id = snapshot_id;
+    new_data_manifests.push_back(std::move(m));
+  }
+  for (const auto& src : rewritten_append_manifests_) {
+    ManifestFile m = src;
+    m.added_snapshot_id = snapshot_id;
+    new_data_manifests.push_back(std::move(m));
+  }
+
+  // Step 5: Write (or retrieve cached) new delete manifests.
+  ICEBERG_ASSIGN_OR_RAISE(auto new_delete_manifests, 
WriteNewDeleteManifests());
+
+  // Step 6: Merge data manifests.
+  ICEBERG_ASSIGN_OR_RAISE(auto merged_data,
+                          data_merge_manager_.MergeManifests(
+                              filtered_data, new_data_manifests, SnapshotId(),
+                              metadata_to_update, ctx_->table->io(), 
tracked_factory));
+
+  // Step 7: Merge delete manifests.
+  ICEBERG_ASSIGN_OR_RAISE(auto merged_deletes,
+                          delete_merge_manager_.MergeManifests(
+                              filtered_deletes, new_delete_manifests, 
SnapshotId(),
+                              metadata_to_update, ctx_->table->io(), 
tracked_factory));
+
+  std::vector<ManifestFile> result;
+  result.reserve(merged_data.size() + merged_deletes.size());
+  result.insert(result.end(), std::make_move_iterator(merged_data.begin()),
+                std::make_move_iterator(merged_data.end()));
+  result.insert(result.end(), std::make_move_iterator(merged_deletes.begin()),
+                std::make_move_iterator(merged_deletes.end()));
+
+  // Manifest count summary.
+  int32_t manifests_created = 0;
+  int32_t manifests_kept = 0;
+  for (const auto& m : result) {
+    if (m.added_snapshot_id == snapshot_id) {
+      ++manifests_created;
+    } else {
+      ++manifests_kept;
+    }
+  }
+  int32_t replaced_manifests_count = 
data_filter_manager_.ReplacedManifestsCount() +
+                                     
delete_filter_manager_.ReplacedManifestsCount() +
+                                     
data_merge_manager_.ReplacedManifestsCount() +
+                                     
delete_merge_manager_.ReplacedManifestsCount();
+  summary_builder().SetManifestCounts(manifests_created, manifests_kept,
+                                      replaced_manifests_count);
+
+  return result;
+}
+
+void MergingSnapshotUpdate::CleanUncommitted(
+    const std::unordered_set<std::string>& committed) {
+  for (const auto& path : all_written_manifests_) {
+    if (!committed.contains(path)) {
+      std::ignore = DeleteFile(path);
+    }
+  }
+  all_written_manifests_.clear();
+  cached_new_data_manifests_.reset();
+  cached_new_delete_manifests_.reset();
+  has_new_data_files_ = false;
+  has_new_delete_files_ = false;
+
+  // rewritten_append_manifests_ are always owned by the table (copied by us),
+  // so delete any that were not committed.
+  for (const auto& m : rewritten_append_manifests_) {
+    if (!committed.contains(m.manifest_path)) {
+      std::ignore = DeleteFile(m.manifest_path);
+    }
+  }
+
+  // append_manifests_ are only owned by the table if the commit succeeded
+  // (i.e., at least one manifest was committed).
+  if (!committed.empty()) {
+    for (const auto& m : append_manifests_) {
+      if (!committed.contains(m.manifest_path)) {
+        std::ignore = DeleteFile(m.manifest_path);
+      }
+    }
+  }
+}
+
+std::unordered_map<std::string, std::string> MergingSnapshotUpdate::Summary() {
+  summary_builder().SetPartitionSummaryLimit(
+      base().properties.Get(TableProperties::kWritePartitionSummaryLimit));
+  return summary_builder().Build();
+}
+
+// -------------------------------------------------------------------------
+// Conflict-detection helpers
+// -------------------------------------------------------------------------
+
+Status MergingSnapshotUpdate::ValidateAddedDataFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    std::shared_ptr<Expression> filter, const std::shared_ptr<Snapshot>& 
parent,
+    std::shared_ptr<FileIO> io, bool case_sensitive) {
+  if (parent == nullptr) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto history, ValidationHistory(metadata, parent->snapshot_id, 
starting_snapshot_id,
+                                      {DataOperation::kAppend, 
DataOperation::kOverwrite},
+                                      ManifestContent::kData, io));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto conflict_path,
+      FindMatchingDataFile(metadata, history.manifests, 
ManifestStatus::kAdded, filter,
+                           nullptr, io, case_sensitive));
+  if (conflict_path.has_value()) {
+    return InvalidArgument(
+        "Found conflicting files that can contain rows matching {}: {}",
+        filter != nullptr ? filter->ToString() : "any expression", 
conflict_path.value());
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateDataFilesExist(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    const std::unordered_set<std::string>& file_paths, bool allow_deletes,
+    std::shared_ptr<Expression> filter, const std::shared_ptr<Snapshot>& 
parent,
+    std::shared_ptr<FileIO> io, bool case_sensitive) {
+  if (parent == nullptr || file_paths.empty()) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors,
+      ValidationAncestorsBetween(metadata, parent->snapshot_id, 
starting_snapshot_id));
+
+  // Build the full set of matching snapshot IDs first, then scan their 
manifests.
+  // The full set must be known before filtering manifests, since a manifest 
may have
+  // been written by a different snapshot in the ancestry range.
+  // Included operations: OVERWRITE and REPLACE always; DELETE when 
allow_deletes is
+  // false.
+  std::unordered_set<int64_t> matching_snapshot_ids;
+  for (const auto& snap : ancestors) {
+    auto op = snap->Operation();
+    if (op == DataOperation::kOverwrite || op == DataOperation::kReplace) {
+      matching_snapshot_ids.insert(snap->snapshot_id);
+    } else if (!allow_deletes && op == DataOperation::kDelete) {
+      matching_snapshot_ids.insert(snap->snapshot_id);
+    }
+  }
+
+  // Build a metrics evaluator for the conflict-detection filter, if provided.
+  std::unique_ptr<InclusiveMetricsEvaluator> evaluator;
+  if (filter != nullptr) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        evaluator, InclusiveMetricsEvaluator::Make(filter, *schema, 
case_sensitive));
+  }
+
+  for (const auto& snapshot : ancestors) {
+    if (!matching_snapshot_ids.contains(snapshot->snapshot_id)) {
+      continue;
+    }
+    auto cached = SnapshotCache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto data_manifests, cached.DataManifests(io));
+
+    for (const auto& manifest : data_manifests) {
+      if (!matching_snapshot_ids.contains(manifest.added_snapshot_id)) {
+        continue;
+      }
+      ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                              
metadata.PartitionSpecById(manifest.partition_spec_id));
+      ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                              ManifestReader::Make(manifest, io, schema, 
spec));
+      ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+
+      for (const auto& entry : entries) {
+        if (entry.status != ManifestStatus::kDeleted) {
+          continue;
+        }
+        if (entry.data_file == nullptr) {
+          continue;
+        }
+        if (!file_paths.contains(entry.data_file->file_path)) {
+          continue;
+        }
+        if (evaluator != nullptr) {
+          ICEBERG_ASSIGN_OR_RAISE(bool matches, 
evaluator->Evaluate(*entry.data_file));
+          if (!matches) {
+            continue;
+          }
+        }
+        return InvalidArgument("Cannot commit, missing data files: {} in 
snapshot {}",
+                               entry.data_file->file_path, 
snapshot->snapshot_id);
+      }
+    }
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    const DataFileSet& replaced_files, const std::shared_ptr<Snapshot>& parent,
+    std::shared_ptr<FileIO> io, bool ignore_equality_deletes) {
+  if (parent == nullptr || replaced_files.empty() || metadata.format_version < 
2) {
+    return {};
+  }
+
+  // Build an index of delete files added since starting_snapshot_id.
+  // Covers both position and equality deletes; the caller controls whether
+  // equality deletes are ignored.
+  ICEBERG_ASSIGN_OR_RAISE(auto deletes, AddedDeleteFiles(metadata, 
starting_snapshot_id,
+                                                         nullptr, nullptr, 
parent, io));
+
+  if (deletes->empty()) {
+    return {};
+  }
+
+  // Compute the starting sequence number for the data file check.
+  int64_t starting_seq = TableMetadata::kInitialSequenceNumber;
+  if (auto snap_result = metadata.SnapshotById(starting_snapshot_id);
+      snap_result.has_value()) {
+    starting_seq = snap_result.value()->sequence_number;
+  }
+
+  for (const auto& data_file : replaced_files) {
+    ICEBERG_ASSIGN_OR_RAISE(auto delete_files,
+                            deletes->ForDataFile(starting_seq, *data_file));
+    if (ignore_equality_deletes) {
+      // Only fail on position deletes — equality deletes at higher sequence 
numbers
+      // still apply to the rewritten files and are not a conflict.
+      for (const auto& df : delete_files) {
+        if (df->content == DataFile::Content::kPositionDeletes) {
+          return InvalidArgument(
+              "Cannot commit, found new position delete for replaced data 
file: {}",
+              data_file->file_path);
+        }
+      }
+    } else {
+      if (!delete_files.empty()) {
+        return InvalidArgument(
+            "Cannot commit, found new delete for replaced data file: {}",
+            data_file->file_path);
+      }
+    }
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateAddedDataFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    const PartitionSet& partition_set, const std::shared_ptr<Snapshot>& parent,
+    std::shared_ptr<FileIO> io) {
+  if (parent == nullptr) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto history, ValidationHistory(metadata, parent->snapshot_id, 
starting_snapshot_id,
+                                      {DataOperation::kAppend, 
DataOperation::kOverwrite},
+                                      ManifestContent::kData, io));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto conflict_path,
+      FindMatchingDataFile(metadata, history.manifests, 
ManifestStatus::kAdded, nullptr,
+                           &partition_set, io, /*case_sensitive=*/true));
+  if (conflict_path.has_value()) {
+    return InvalidArgument(
+        "Found conflicting files that can contain rows in validated 
partitions: {}",
+        conflict_path.value());
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    std::shared_ptr<Expression> data_filter, const DataFileSet& replaced_files,
+    const std::shared_ptr<Snapshot>& parent, std::shared_ptr<FileIO> io) {
+  if (parent == nullptr || replaced_files.empty() || metadata.format_version < 
2) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto deletes,
+                          AddedDeleteFiles(metadata, starting_snapshot_id,
+                                           std::move(data_filter), nullptr, 
parent, io));
+  if (deletes->empty()) {
+    return {};
+  }
+
+  int64_t starting_seq = TableMetadata::kInitialSequenceNumber;
+  if (auto snap_result = metadata.SnapshotById(starting_snapshot_id);
+      snap_result.has_value()) {
+    starting_seq = snap_result.value()->sequence_number;
+  }
+
+  for (const auto& data_file : replaced_files) {
+    ICEBERG_ASSIGN_OR_RAISE(auto delete_files,
+                            deletes->ForDataFile(starting_seq, *data_file));
+    for (const auto& delete_file : delete_files) {
+      return InvalidArgument("Cannot commit, found new delete for replaced 
data file: {}",
+                             data_file->file_path);
+    }
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    std::shared_ptr<Expression> data_filter, const std::shared_ptr<Snapshot>& 
parent,
+    std::shared_ptr<FileIO> io) {
+  ICEBERG_ASSIGN_OR_RAISE(auto deletes,
+                          AddedDeleteFiles(metadata, starting_snapshot_id,
+                                           std::move(data_filter), nullptr, 
parent, io));
+  auto referenced_delete_files = deletes->ReferencedDeleteFiles();
+
+  for (const auto& delete_file : referenced_delete_files) {
+    return InvalidArgument("Found new conflicting delete files: {}",
+                           delete_file->file_path);
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    const PartitionSet& partition_set, const std::shared_ptr<Snapshot>& parent,
+    std::shared_ptr<FileIO> io) {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto deletes,
+      AddedDeleteFiles(metadata, starting_snapshot_id, nullptr,
+                       std::make_shared<PartitionSet>(partition_set), parent, 
io));
+  auto referenced_delete_files = deletes->ReferencedDeleteFiles();
+  for (const auto& delete_file : referenced_delete_files) {
+    return InvalidArgument(
+        "Found new conflicting delete files in validated partitions: {}",
+        delete_file->file_path);
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateDeletedDataFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    std::shared_ptr<Expression> data_filter, const std::shared_ptr<Snapshot>& 
parent,
+    std::shared_ptr<FileIO> io) {
+  if (parent == nullptr) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto history, ValidationHistory(metadata, parent->snapshot_id, 
starting_snapshot_id,
+                                      {DataOperation::kOverwrite, 
DataOperation::kReplace,
+                                       DataOperation::kDelete},
+                                      ManifestContent::kData, io));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto conflict_path,
+      FindMatchingDataFile(metadata, history.manifests, 
ManifestStatus::kDeleted,
+                           data_filter, nullptr, io, /*case_sensitive=*/true));
+  if (conflict_path.has_value()) {
+    return InvalidArgument(
+        "Found conflicting deleted files that can contain rows matching {}: 
{}",
+        data_filter != nullptr ? data_filter->ToString() : "any expression",
+        conflict_path.value());
+  }
+  return {};
+}
+
+Status MergingSnapshotUpdate::ValidateDeletedDataFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    const PartitionSet& partition_set, const std::shared_ptr<Snapshot>& parent,
+    std::shared_ptr<FileIO> io) {
+  if (parent == nullptr) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto history, ValidationHistory(metadata, parent->snapshot_id, 
starting_snapshot_id,
+                                      {DataOperation::kOverwrite, 
DataOperation::kReplace,
+                                       DataOperation::kDelete},
+                                      ManifestContent::kData, io));
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto conflict_path,
+      FindMatchingDataFile(metadata, history.manifests, 
ManifestStatus::kDeleted, nullptr,
+                           &partition_set, io, /*case_sensitive=*/true));
+  if (conflict_path.has_value()) {
+    return InvalidArgument("Found conflicting deleted files in validated 
partitions: {}",
+                           conflict_path.value());
+  }
+  return {};
+}
+
+Result<std::unique_ptr<DeleteFileIndex>> 
MergingSnapshotUpdate::AddedDeleteFiles(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    std::shared_ptr<Expression> data_filter, std::shared_ptr<PartitionSet> 
partition_set,
+    const std::shared_ptr<Snapshot>& parent, std::shared_ptr<FileIO> io,
+    bool case_sensitive) {
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+
+  if (parent == nullptr || metadata.format_version < 2) {
+    TableMetadataCache metadata_cache(&metadata);
+    ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, 
metadata_cache.GetPartitionSpecsById());
+    std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id(
+        specs_ref.get().begin(), specs_ref.get().end());
+    ICEBERG_ASSIGN_OR_RAISE(auto builder, DeleteFileIndex::BuilderFor(
+                                              io, schema, 
std::move(specs_by_id), {}));
+    return builder.Build();
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto history, ValidationHistory(metadata, parent->snapshot_id, 
starting_snapshot_id,
+                                      {DataOperation::kOverwrite, 
DataOperation::kDelete},
+                                      ManifestContent::kDeletes, io));
+
+  // Compute the starting sequence number from the starting snapshot.
+  int64_t starting_seq = TableMetadata::kInitialSequenceNumber;
+  if (auto snap_result = metadata.SnapshotById(starting_snapshot_id);
+      snap_result.has_value()) {
+    starting_seq = snap_result.value()->sequence_number;
+  }
+
+  TableMetadataCache metadata_cache(&metadata);
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_ref, 
metadata_cache.GetPartitionSpecsById());
+  std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id(
+      specs_ref.get().begin(), specs_ref.get().end());
+
+  ICEBERG_ASSIGN_OR_RAISE(auto builder,
+                          DeleteFileIndex::BuilderFor(io, schema, 
std::move(specs_by_id),
+                                                      
std::move(history.manifests)));
+  builder.AfterSequenceNumber(starting_seq);
+  builder.CaseSensitive(case_sensitive);
+  if (data_filter != nullptr) {
+    builder.DataFilter(std::move(data_filter));
+  }
+  if (partition_set != nullptr) {
+    builder.FilterPartitions(std::move(partition_set));
+  }
+  return builder.Build();
+}
+
+Status MergingSnapshotUpdate::ValidateAddedDVs(
+    const TableMetadata& metadata, int64_t starting_snapshot_id,
+    std::shared_ptr<Expression> conflict_filter, const 
std::shared_ptr<Snapshot>& parent,
+    std::shared_ptr<FileIO> io) {
+  if (parent == nullptr || metadata.format_version < 3) {
+    return {};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto deletes, AddedDeleteFiles(metadata, starting_snapshot_id,

Review Comment:
   Agreed. Using the generic AddedDeleteFiles index is too broad here. I’ll 
change this to match Java’s validation semantics: scan the relevant 
overwrite/delete/replace history and only reject DVs whose referenced data file 
is also in this operation’s staged DV set.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to