wgtmac commented on code in PR #682:
URL: https://github.com/apache/iceberg-cpp/pull/682#discussion_r3322549766


##########
src/iceberg/util/snapshot_util.cc:
##########
@@ -375,7 +375,7 @@ Result<std::shared_ptr<Schema>> 
SnapshotUtil::SchemaFor(const Table& table,
 
   const auto& metadata = table.metadata();
   auto it = metadata->refs.find(ref);
-  if (it == metadata->refs.cend() || it->second->type() == 
SnapshotRefType::kBranch) {
+  if (it == metadata->refs.cend()) {

Review Comment:
   Keep Java parity here: branches should return the current table schema, and 
only tags should use the referenced snapshot schema. Please keep the branch 
check in both SchemaFor(ref) overloads.



##########
src/iceberg/update/merging_snapshot_update.h:
##########
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+/// \file iceberg/update/merging_snapshot_update.h
+
+#include <cstdint>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/delete_file_index.h"
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_filter_manager.h"
+#include "iceberg/manifest/manifest_merge_manager.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/snapshot_update.h"
+#include "iceberg/util/data_file_set.h"
+
+namespace iceberg {
+
+/// \brief Abstract base class for all merge-based snapshot write operations.
+///
+/// Provides the complete filter → write → merge pipeline that all merge-based
+/// operations (MergeAppend, OverwriteFiles, RowDelta, ReplacePartitions,
+/// RewriteFiles) share. Subclasses only need to implement `operation()` and
+/// call the protected primitive API to describe what changes to make.
+///
+/// The Apply() pipeline:
+///   1. Filter data manifests (via data_filter_manager_)
+///   2. Compute min data sequence number and set up delete filter cleanup
+///   3. Filter delete manifests (via delete_filter_manager_)
+///   4. Write new data manifests (cached for commit retry)
+///   5. Write new delete manifests (cached for commit retry)
+///   6. Merge data manifests (via data_merge_manager_)
+///   7. Merge delete manifests (via delete_merge_manager_)
+///
+class ICEBERG_EXPORT MergingSnapshotUpdate : public SnapshotUpdate {
+ public:
+  ~MergingSnapshotUpdate() override = default;
+
+  // SnapshotUpdate overrides
+  Result<std::vector<ManifestFile>> Apply(
+      const TableMetadata& metadata_to_update,
+      const std::shared_ptr<Snapshot>& snapshot) override;
+
+  void CleanUncommitted(const std::unordered_set<std::string>& committed) 
override;
+
+  std::unordered_map<std::string, std::string> Summary() override;
+
+  /// \brief Set a custom property in the snapshot summary.
+  void Set(const std::string& property, const std::string& value);
+
+ protected:
+  /// \brief Constructor; reads merge configuration from table properties.
+  explicit MergingSnapshotUpdate(std::string table_name,
+                                 std::shared_ptr<TransactionContext> ctx);
+
+  /// \brief Stage a data file to be added to the table.
+  Status AddDataFile(std::shared_ptr<DataFile> file);
+
+  /// \brief Stage a delete file to be added to the table.
+  Status AddDeleteFile(std::shared_ptr<DataFile> file);
+
+  /// \brief Validate a delete file against the table format version rules.
+  ///
+  /// - Format v1: deletes are not supported.
+  /// - Format v2: position deletes must NOT be deletion vectors (DVs).
+  /// - Format v3+: position deletes MUST be deletion vectors (DVs).
+  Status ValidateNewDeleteFile(const DataFile& file);
+
+  /// \brief Stage a delete file with an explicit data sequence number.
+  ///
+  Status AddDeleteFile(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number);
+
+  /// \brief Add all files in a pre-existing data manifest to the new snapshot.
+  ///
+  /// The manifest must contain DATA content. If snapshot ID inheritance is
+  /// enabled and the manifest has no snapshot ID assigned, it is used 
directly;
+  /// otherwise it is copied with the current snapshot ID.
+  Status AddManifest(ManifestFile manifest);
+
+  /// \brief Register a data file (by object) to be deleted from the table.
+  Status DeleteDataFile(std::shared_ptr<DataFile> file);
+
+  /// \brief Register a delete file (by object) to be removed from the table.
+  Status DeleteDeleteFile(std::shared_ptr<DataFile> file);
+
+  /// \brief Register a data file path to be deleted from the table.
+  ///
+  /// \note Only applies to data files. To remove delete files, use 
DeleteDeleteFile().
+  void DeleteByPath(std::string_view path);
+
+  /// \brief Register an expression to delete matching rows.
+  ///
+  /// Both data and delete filter managers receive the expression: delete 
files that
+  /// match the row filter can also be removed because those rows will be 
deleted.
+  Status DeleteByRowFilter(std::shared_ptr<Expression> expr);
+
+  /// \brief Register a partition to be dropped.
+  ///
+  /// Both data and delete filter managers receive the partition drop, since 
dropping
+  /// data in a partition also drops all delete files in that partition.
+  void DropPartition(int32_t spec_id, PartitionValues partition);
+
+  /// \brief Fail if any registered delete path is not found in any manifest.
+  void FailMissingDeletePaths();
+
+  /// \brief Fail if any manifest entry matches a delete condition.
+  void FailAnyDelete();
+
+  /// \brief Override the data sequence number assigned to all newly-added 
data files.
+  void SetNewDataFilesDataSequenceNumber(int64_t sequence_number);
+
+  /// \brief Set case sensitivity for row filter and expression evaluation.
+  void CaseSensitive(bool case_sensitive);
+
+  /// \brief Returns true if case-sensitive matching is enabled (default: 
true).
+  bool IsCaseSensitive() const { return case_sensitive_; }
+
+  /// \brief Returns true if any data files have been staged for addition.
+  bool AddsDataFiles() const;
+
+  /// \brief Returns true if any delete files have been staged for addition.
+  bool AddsDeleteFiles() const;
+
+  /// \brief Returns true if any data files have been registered for deletion.
+  bool DeletesDataFiles() const;
+
+  /// \brief Returns true if any delete files have been registered for removal.
+  bool DeletesDeleteFiles() const;
+
+  /// \brief Returns the row-filter expression set via DeleteByRowFilter, or 
nullptr.
+  const std::shared_ptr<Expression>& RowFilter() const { return 
delete_expression_; }
+
+  /// \brief Returns the single partition spec for all staged data files.
+  ///
+  /// Precondition: exactly one partition spec ID must be represented among 
staged
+  /// data files.
+  Result<std::shared_ptr<PartitionSpec>> DataSpec() const;
+
+  /// \brief Returns all data files staged for addition.
+  std::vector<std::shared_ptr<DataFile>> AddedDataFiles() const;
+
+  /// \brief Return an error if any snapshot in [starting_snapshot_id+1, 
parent]
+  /// added a data file matching the given filter expression.
+  static Status ValidateAddedDataFiles(const TableMetadata& metadata,
+                                       int64_t starting_snapshot_id,

Review Comment:
   Java allows startingSnapshotId to be null and validates back to the first 
snapshot. Please make this optional through these validation helpers so default 
RowDelta/Overwrite/Rewrite validation matches Java.



##########
src/iceberg/test/merging_snapshot_update_test.cc:
##########
@@ -0,0 +1,1278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/merging_snapshot_update.h"
+
+#include <memory>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/constants.h"
+#include "iceberg/expression/expressions.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/row/partition_values.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+#include "iceberg/transaction.h"
+#include "iceberg/update/fast_append.h"
+#include "iceberg/update/update_properties.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+/// \brief Concrete subclass of MergingSnapshotUpdate for testing.
+class TestMergeAppend : public MergingSnapshotUpdate {
+ public:
+  static Result<std::unique_ptr<TestMergeAppend>> Make(std::string table_name,
+                                                       std::shared_ptr<Table> 
table) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto ctx, TransactionContext::Make(std::move(table), 
TransactionKind::kUpdate));
+    return std::unique_ptr<TestMergeAppend>(
+        new TestMergeAppend(std::move(table_name), std::move(ctx)));
+  }
+
+  std::string operation() override { return "append"; }
+
+  // Expose protected API for test access
+  Status AddFile(std::shared_ptr<DataFile> file) { return 
AddDataFile(std::move(file)); }
+  Status AddDelete(std::shared_ptr<DataFile> file) {
+    return AddDeleteFile(std::move(file));
+  }
+  Status AddDelete(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number) {
+    return AddDeleteFile(std::move(file), data_sequence_number);
+  }
+  Status RemoveDataFile(std::shared_ptr<DataFile> file) {
+    return DeleteDataFile(std::move(file));
+  }
+  Status RemoveDeleteFile(std::shared_ptr<DataFile> file) {
+    return DeleteDeleteFile(std::move(file));
+  }
+  Status AppendManifest(ManifestFile manifest) {
+    return AddManifest(std::move(manifest));
+  }
+  Result<std::shared_ptr<PartitionSpec>> DataSpec() const {
+    return MergingSnapshotUpdate::DataSpec();
+  }
+  int64_t GeneratedSnapshotId() { return SnapshotId(); }
+  void SetDataSeqNumber(int64_t seq) { SetNewDataFilesDataSequenceNumber(seq); 
}
+  static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
+                                              int64_t starting_snapshot_id,
+                                              const std::shared_ptr<Snapshot>& 
parent,
+                                              std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDataFiles(metadata, 
starting_snapshot_id,
+                                                         nullptr, parent, 
std::move(io));
+  }
+  static Status ValidateAddedDataFilesForTest(const TableMetadata& metadata,
+                                              int64_t starting_snapshot_id,
+                                              const PartitionSet& 
partition_set,
+                                              const std::shared_ptr<Snapshot>& 
parent,
+                                              std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDataFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateNoNewDeletesForDataFilesForTest(
+      const TableMetadata& metadata, int64_t starting_snapshot_id,
+      std::shared_ptr<Expression> data_filter, const DataFileSet& 
replaced_files,
+      const std::shared_ptr<Snapshot>& parent, std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeletesForDataFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), 
replaced_files, parent,
+        std::move(io));
+  }
+  static Status ValidateNoNewDeleteFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                std::shared_ptr<Expression> 
data_filter,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), parent, 
std::move(io));
+  }
+  static Status ValidateNoNewDeleteFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                const PartitionSet& 
partition_set,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateNoNewDeleteFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateDeletedDataFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                std::shared_ptr<Expression> 
data_filter,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateDeletedDataFiles(
+        metadata, starting_snapshot_id, std::move(data_filter), parent, 
std::move(io));
+  }
+  static Status ValidateDeletedDataFilesForTest(const TableMetadata& metadata,
+                                                int64_t starting_snapshot_id,
+                                                const PartitionSet& 
partition_set,
+                                                const 
std::shared_ptr<Snapshot>& parent,
+                                                std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateDeletedDataFiles(
+        metadata, starting_snapshot_id, partition_set, parent, std::move(io));
+  }
+  static Status ValidateAddedDVsForTest(
+      const TableMetadata& metadata, int64_t starting_snapshot_id,
+      std::shared_ptr<Expression> conflict_filter,
+      const std::unordered_set<std::string>& referenced_data_files,
+      const std::shared_ptr<Snapshot>& parent, std::shared_ptr<FileIO> io) {
+    return MergingSnapshotUpdate::ValidateAddedDVs(
+        metadata, starting_snapshot_id, std::move(conflict_filter), 
referenced_data_files,
+        parent, std::move(io));
+  }
+
+  bool HasDataFiles() const { return AddsDataFiles(); }
+  bool HasDeleteFiles() const { return AddsDeleteFiles(); }
+  bool HasDataDeletes() const { return DeletesDataFiles(); }
+
+ private:
+  TestMergeAppend(std::string table_name, std::shared_ptr<TransactionContext> 
ctx)
+      : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {}
+};
+
+class TestOverwriteUpdate : public MergingSnapshotUpdate {
+ public:
+  static Result<std::unique_ptr<TestOverwriteUpdate>> Make(std::string 
table_name,
+                                                           
std::shared_ptr<Table> table) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto ctx, TransactionContext::Make(std::move(table), 
TransactionKind::kUpdate));
+    return std::unique_ptr<TestOverwriteUpdate>(
+        new TestOverwriteUpdate(std::move(table_name), std::move(ctx)));
+  }
+
+  std::string operation() override { return DataOperation::kOverwrite; }
+  int64_t GeneratedSnapshotId() { return SnapshotId(); }
+
+  Status AddDelete(std::shared_ptr<DataFile> file) {
+    return AddDeleteFile(std::move(file));
+  }
+  Status AddDelete(std::shared_ptr<DataFile> file, int64_t 
data_sequence_number) {
+    return AddDeleteFile(std::move(file), data_sequence_number);
+  }
+  Status RemoveDataFile(std::shared_ptr<DataFile> file) {
+    return DeleteDataFile(std::move(file));
+  }
+
+ private:
+  TestOverwriteUpdate(std::string table_name, 
std::shared_ptr<TransactionContext> ctx)
+      : MergingSnapshotUpdate(std::move(table_name), std::move(ctx)) {}
+};
+
+class MergingSnapshotUpdateTest : public MinimalUpdateTestBase {
+ protected:
+  static void SetUpTestSuite() { avro::RegisterAll(); }
+
+  void SetUp() override {
+    MinimalUpdateTestBase::SetUp();
+
+    ICEBERG_UNWRAP_OR_FAIL(spec_, table_->spec());
+    ICEBERG_UNWRAP_OR_FAIL(schema_, table_->schema());
+
+    file_a_ = MakeDataFile("/data/file_a.parquet", /*partition_x=*/1L);
+    file_b_ = MakeDataFile("/data/file_b.parquet", /*partition_x=*/2L);
+  }
+
+  std::shared_ptr<DataFile> MakeDataFile(const std::string& path, int64_t 
partition_x) {
+    auto f = std::make_shared<DataFile>();
+    f->content = DataFile::Content::kData;
+    f->file_path = table_location_ + path;
+    f->file_format = FileFormatType::kParquet;
+    f->partition = 
PartitionValues(std::vector<Literal>{Literal::Long(partition_x)});
+    f->file_size_in_bytes = 1024;
+    f->record_count = 100;
+    f->partition_spec_id = spec_->spec_id();
+    return f;
+  }
+
+  std::shared_ptr<DataFile> MakeDeleteFile(const std::string& path, int64_t 
partition_x) {
+    auto f = MakeDataFile(path, partition_x);
+    f->content = DataFile::Content::kPositionDeletes;
+    return f;
+  }
+
+  std::shared_ptr<DataFile> MakeEqualityDeleteFile(const std::string& path,
+                                                   int64_t partition_x) {
+    auto f = MakeDeleteFile(path, partition_x);
+    f->content = DataFile::Content::kEqualityDeletes;
+    f->equality_ids = {1};
+    return f;
+  }
+
+  Result<std::unique_ptr<TestMergeAppend>> NewMergeAppend() {
+    return TestMergeAppend::Make(TableName(), table_);
+  }
+
+  Result<std::unique_ptr<TestOverwriteUpdate>> NewOverwriteUpdate() {
+    return TestOverwriteUpdate::Make(TableName(), table_);
+  }
+
+  // Commit file_a_ with FastAppend and refresh the table.
+  void CommitFileA() {
+    ICEBERG_UNWRAP_OR_FAIL(auto fa, table_->NewFastAppend());
+    fa->AppendFile(file_a_);
+    EXPECT_THAT(fa->Commit(), IsOk());
+    EXPECT_THAT(table_->Refresh(), IsOk());
+  }
+
+  // Read all entries from a list of ManifestFiles.
+  Result<std::vector<ManifestEntry>> ReadAllEntries(
+      const std::vector<ManifestFile>& manifests, const TableMetadata& 
metadata) {
+    std::vector<ManifestEntry> result;
+    for (const auto& m : manifests) {
+      ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(m.partition_spec_id));
+      ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+      ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                              ManifestReader::Make(m, file_io_, schema, spec));
+      ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+      result.insert(result.end(), entries.begin(), entries.end());
+    }
+    return result;
+  }
+
+  // Write a manifest file containing the given data files.
+  // Returns a ManifestFile with added_snapshot_id = kInvalidSnapshotId so it
+  // is eligible for snapshot ID inheritance.
+  Result<ManifestFile> WriteManifest(
+      const std::string& path, const std::vector<std::shared_ptr<DataFile>>& 
files) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestWriter::MakeWriter(/*format_version=*/2, kInvalidSnapshotId, 
path,
+                                   file_io_, spec_, schema_, 
ManifestContent::kData));
+    for (const auto& f : files) {
+      ManifestEntry entry;
+      entry.status = ManifestStatus::kAdded;
+      entry.snapshot_id = std::nullopt;
+      entry.data_file = f;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->ToManifestFile();
+  }
+
+  Result<ManifestFile> WriteDeleteManifest(
+      const TableMetadata& metadata, const std::string& path,
+      const std::vector<std::shared_ptr<DataFile>>& files, int64_t 
sequence_number) {
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+    ICEBERG_ASSIGN_OR_RAISE(auto spec, 
metadata.PartitionSpecById(spec_->spec_id()));
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestWriter::MakeWriter(metadata.format_version, 
/*snapshot_id=*/1L, path,
+                                   file_io_, spec, schema, 
ManifestContent::kDeletes));
+    for (const auto& f : files) {
+      ManifestEntry entry;
+      entry.status = ManifestStatus::kAdded;
+      entry.snapshot_id = 1L;
+      entry.sequence_number = sequence_number;
+      entry.data_file = f;
+      ICEBERG_RETURN_UNEXPECTED(writer->WriteAddedEntry(entry));
+    }
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+    return writer->ToManifestFile();
+  }
+
+  Result<std::shared_ptr<Snapshot>> MakeSyntheticSnapshot(
+      std::string operation, int64_t snapshot_id,
+      std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+      const std::vector<ManifestFile>& manifests) {
+    auto manifest_list_path = table_location_ + "/metadata/manifest-list-" +
+                              std::to_string(snapshot_id) + ".avro";
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto writer,
+        ManifestListWriter::MakeWriter(table_->metadata()->format_version, 
snapshot_id,
+                                       parent_snapshot_id, manifest_list_path, 
file_io_,
+                                       sequence_number));
+    ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+    ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+    ICEBERG_ASSIGN_OR_RAISE(
+        auto snapshot,
+        Snapshot::Make(sequence_number, snapshot_id, parent_snapshot_id, 
TimePointMs{},
+                       std::move(operation), {}, 
table_->metadata()->current_schema_id,
+                       manifest_list_path));
+    return std::shared_ptr<Snapshot>(std::move(snapshot));
+  }
+
+  std::shared_ptr<PartitionSpec> spec_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<DataFile> file_a_;
+  std::shared_ptr<DataFile> file_b_;
+};
+
+// -------------------------------------------------------------------------
+// State query tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, AddsDataFilesInitiallyFalse) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_FALSE(op->HasDataFiles());
+  EXPECT_FALSE(op->HasDeleteFiles());
+  EXPECT_FALSE(op->HasDataDeletes());
+}
+
+TEST_F(MergingSnapshotUpdateTest, AddsDataFilesTrueAfterAdd) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_TRUE(op->HasDataFiles());
+  EXPECT_FALSE(op->HasDeleteFiles());
+}
+
+TEST_F(MergingSnapshotUpdateTest, AddsDeleteFilesTrueAfterAdd) {
+  auto del_file = MakeEqualityDeleteFile("/delete/del_a.parquet", 1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_FALSE(op->HasDataFiles());
+  EXPECT_TRUE(op->HasDeleteFiles());
+}
+
+TEST_F(MergingSnapshotUpdateTest, DeletesDataFilesTrueAfterRegisterDelete) {
+  CommitFileA();
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->RemoveDataFile(file_a_), IsOk());
+  EXPECT_TRUE(op->HasDataDeletes());
+}
+
+// -------------------------------------------------------------------------
+// Apply / Commit tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, CommitNewDataFile) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedRecords), "100");
+}
+
+TEST_F(MergingSnapshotUpdateTest, CommitMultipleDataFiles) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->AddFile(file_b_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "2");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedRecords), "200");
+}
+
+TEST_F(MergingSnapshotUpdateTest, CommitDataFileAndDeleteFile) {
+  auto del_file = MakeEqualityDeleteFile("/delete/del_a.parquet", 1L);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  // Data file summary
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
+}
+
+TEST_F(MergingSnapshotUpdateTest, CommitPreservesExistingManifests) {
+  // First append: file_a
+  CommitFileA();
+
+  // Second merge append: file_b
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_b_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  // Both data files should be visible — 1 existing + 1 new
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "2");
+}
+
+TEST_F(MergingSnapshotUpdateTest, CommitDeletesDataFile) {
+  CommitFileA();
+
+  // Remove file_a via merging snapshot update
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->RemoveDataFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "0");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kDeletedDataFiles), 
"1");
+}
+
+TEST_F(MergingSnapshotUpdateTest, SetNewDataFilesDataSequenceNumber) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  op->SetDataSeqNumber(42);
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
+}
+
+TEST_F(MergingSnapshotUpdateTest, CustomSummaryPropertySurvivesApplyRebuild) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  op->Set("custom-prop", "custom-value");
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at("custom-prop"), "custom-value");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
+}
+
+// -------------------------------------------------------------------------
+// CleanUncommitted test
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, 
CleanUncommittedAfterSuccessfulCommitDoesNotCrash) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  // Simulate a caller invoking CleanUncommitted after a commit (e.g. cleanup
+  // in an error handler that runs regardless of success). Passing an empty set
+  // means no manifests are considered committed, so CleanUncommitted attempts
+  // to delete all written manifests. This should not crash.
+  op->CleanUncommitted({});
+}
+
+// -------------------------------------------------------------------------
+// Delete file summary tests
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, CommitDeleteFileSummaryHasAddedDeleteFiles) {
+  auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), 
"1");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeleteFiles), 
"1");
+  
EXPECT_EQ(snapshot->summary.count(SnapshotSummaryFields::kRemovedDeleteFiles), 
0);
+}
+
+TEST_F(MergingSnapshotUpdateTest, CommitDeduplicatesStagedDeleteFiles) {
+  auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_THAT(op->AddDelete(del_file), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDeleteFiles), 
"1");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedPosDeleteFiles), 
"1");
+}
+
+TEST_F(MergingSnapshotUpdateTest, 
AddDeleteFileWithExplicitSequenceWritesSequenceNumber) {
+  auto del_file = MakeEqualityDeleteFile("/delete/del_a.parquet", 1L);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddDelete(del_file, 17), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto manifests, op->Apply(*table_->metadata(), 
nullptr));
+  auto delete_manifest_it =
+      std::ranges::find_if(manifests, [](const ManifestFile& manifest) {
+        return manifest.content == ManifestContent::kDeletes;
+      });
+  ASSERT_NE(delete_manifest_it, manifests.end());
+  ICEBERG_UNWRAP_OR_FAIL(auto entries,
+                         
ReadAllEntries(std::vector<ManifestFile>{*delete_manifest_it},
+                                        *table_->metadata()));
+  ASSERT_EQ(entries.size(), 1U);
+  ASSERT_TRUE(entries[0].sequence_number.has_value());
+  EXPECT_EQ(entries[0].sequence_number.value(), 17);
+}
+
+// Covers the bug where deleted delete files were not tracked in the snapshot 
summary.
+TEST_F(MergingSnapshotUpdateTest, 
CommitDeletesDeleteFileSummaryHasRemovedDeleteFiles) {
+  // Step 1: commit a delete file.
+  auto del_file = MakeDeleteFile("/delete/del_a.parquet", 1L);
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+    EXPECT_THAT(op->AddDelete(del_file), IsOk());
+    EXPECT_THAT(op->Commit(), IsOk());
+    EXPECT_THAT(table_->Refresh(), IsOk());
+  }
+
+  // Step 2: commit a new snapshot that removes the delete file.
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->RemoveDeleteFile(del_file), IsOk());
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedDeleteFiles), 
"1");
+  
EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kRemovedPosDeleteFiles), 
"1");
+  EXPECT_EQ(snapshot->summary.count(SnapshotSummaryFields::kAddedDeleteFiles), 
0);
+}
+
+// -------------------------------------------------------------------------
+// Deduplication test
+// -------------------------------------------------------------------------
+
+TEST_F(MergingSnapshotUpdateTest, DuplicateDataFileOnlyCountedOnce) {
+  ICEBERG_UNWRAP_OR_FAIL(auto op, NewMergeAppend());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());
+  EXPECT_THAT(op->AddFile(file_a_), IsOk());  // duplicate — should be ignored
+  EXPECT_THAT(op->Commit(), IsOk());
+
+  EXPECT_THAT(table_->Refresh(), IsOk());
+  ICEBERG_UNWRAP_OR_FAIL(auto snapshot, table_->current_snapshot());
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kAddedDataFiles), "1");
+  EXPECT_EQ(snapshot->summary.at(SnapshotSummaryFields::kTotalDataFiles), "1");
+}
+
+// -------------------------------------------------------------------------
+// ValidateNewDeleteFile format version tests

Review Comment:
   Please add Java-parity tests for the v3 gates: reject non-DV position 
deletes on v3, allow DVs on v3, and reject a v2-staged position delete after 
upgrading to v3 before commit.



##########
src/iceberg/update/snapshot_update.cc:
##########
@@ -41,7 +41,7 @@ namespace iceberg {
 
 namespace {
 
-// The Java impl skips updating total if parsing fails. Here we choose to be 
strict.
+// Skips updating total if parsing fails would be lenient; here we choose to 
be strict.

Review Comment:
   This sentence is hard to parse. Consider: `Java skips updating totals if 
parsing fails; C++ treats parse failures as errors.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to