This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new a29355dd feat: add snapshot update (#408)
a29355dd is described below

commit a29355ddb904cc75aa8e04a591797f8f163b85a0
Author: Junwang Zhao <[email protected]>
AuthorDate: Tue Jan 13 13:45:56 2026 +0800

    feat: add snapshot update (#408)
---
 src/iceberg/CMakeLists.txt                  |   1 +
 src/iceberg/avro/avro_schema_util.cc        |  10 +-
 src/iceberg/json_internal.cc                |  16 +-
 src/iceberg/manifest/manifest_writer.cc     |  11 +-
 src/iceberg/manifest/manifest_writer.h      |   3 +-
 src/iceberg/meson.build                     |   1 +
 src/iceberg/snapshot.cc                     | 103 ++++++-
 src/iceberg/snapshot.h                      |  74 ++++-
 src/iceberg/table.h                         |  20 +-
 src/iceberg/table_metadata.cc               | 286 +++++++++++++++++-
 src/iceberg/table_metadata.h                |  25 +-
 src/iceberg/table_properties.h              |   4 +-
 src/iceberg/table_update.cc                 |   4 +-
 src/iceberg/table_update.h                  |  13 +
 src/iceberg/test/json_internal_test.cc      |   2 +-
 src/iceberg/test/snapshot_test.cc           |   2 +-
 src/iceberg/test/table_requirements_test.cc |  12 +-
 src/iceberg/test/update_properties_test.cc  |   2 +-
 src/iceberg/transaction.cc                  |  66 ++++-
 src/iceberg/transaction.h                   |  10 +-
 src/iceberg/type_fwd.h                      |   1 +
 src/iceberg/update/meson.build              |   1 +
 src/iceberg/update/pending_update.cc        |   6 +
 src/iceberg/update/pending_update.h         |  14 +-
 src/iceberg/update/snapshot_update.cc       | 434 ++++++++++++++++++++++++++++
 src/iceberg/update/snapshot_update.h        | 196 +++++++++++++
 src/iceberg/update/update_partition_spec.cc |  17 +-
 src/iceberg/update/update_properties.cc     |  18 +-
 src/iceberg/update/update_schema.cc         |   6 +-
 src/iceberg/update/update_sort_order.cc     |   5 +-
 src/iceberg/util/snapshot_util.cc           |  25 ++
 src/iceberg/util/snapshot_util_internal.h   |  28 +-
 src/iceberg/util/string_util.h              |  18 ++
 src/iceberg/util/timepoint.cc               |   7 +
 src/iceberg/util/timepoint.h                |   3 +
 src/iceberg/util/uuid.cc                    |  12 +
 src/iceberg/util/uuid.h                     |   3 +
 37 files changed, 1362 insertions(+), 97 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 3e0c66f9..23d2e4cd 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -82,6 +82,7 @@ set(ICEBERG_SOURCES
     transform_function.cc
     type.cc
     update/pending_update.cc
+    update/snapshot_update.cc
     update/update_partition_spec.cc
     update/update_properties.cc
     update/update_schema.cc
diff --git a/src/iceberg/avro/avro_schema_util.cc 
b/src/iceberg/avro/avro_schema_util.cc
index ba38c1f9..b19ffce7 100644
--- a/src/iceberg/avro/avro_schema_util.cc
+++ b/src/iceberg/avro/avro_schema_util.cc
@@ -17,7 +17,6 @@
  * under the License.
  */
 
-#include <charconv>
 #include <format>
 #include <mutex>
 #include <sstream>
@@ -40,6 +39,7 @@
 #include "iceberg/schema_util_internal.h"
 #include "iceberg/util/formatter.h"
 #include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
 #include "iceberg/util/visit_type.h"
 
 namespace iceberg::avro {
@@ -471,13 +471,7 @@ Result<int32_t> GetId(const ::avro::NodePtr& node, const 
std::string& attr_name,
     return InvalidSchema("Missing avro attribute: {}", attr_name);
   }
 
-  int32_t id;
-  const auto& id_value = id_str.value();
-  auto [_, ec] = std::from_chars(id_value.data(), id_value.data() + 
id_value.size(), id);
-  if (ec != std::errc()) {
-    return InvalidSchema("Invalid {}: {}", attr_name, id_value);
-  }
-  return id;
+  return StringUtils::ParseInt<int32_t>(id_str.value());
 }
 
 Result<int32_t> GetElementId(const ::avro::NodePtr& node) {
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index 669d0b50..4cd2f03f 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -399,7 +399,7 @@ nlohmann::json ToJson(const Snapshot& snapshot) {
   json[kTimestampMs] = UnixMsFromTimePointMs(snapshot.timestamp_ms);
   json[kManifestList] = snapshot.manifest_list;
   // If there is an operation, write the summary map
-  if (snapshot.operation().has_value()) {
+  if (snapshot.Operation().has_value()) {
     json[kSummary] = snapshot.summary;
   }
   SetOptionalField(json, kSchemaId, snapshot.schema_id);
@@ -1553,9 +1553,17 @@ Result<std::unique_ptr<TableUpdate>> 
TableUpdateFromJson(const nlohmann::json& j
                             GetJsonValueOptional<int64_t>(json, 
kMaxSnapshotAgeMs));
     ICEBERG_ASSIGN_OR_RAISE(auto max_ref_age,
                             GetJsonValueOptional<int64_t>(json, kMaxRefAgeMs));
-    return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), 
snapshot_id, type,
-                                                   min_snapshots, 
max_snapshot_age,
-                                                   max_ref_age);
+    if (type == SnapshotRefType::kTag) {
+      ICEBERG_ASSIGN_OR_RAISE(auto tag, SnapshotRef::MakeTag(snapshot_id, 
max_ref_age));
+      return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), 
*tag);
+    } else {
+      ICEBERG_CHECK(type == SnapshotRefType::kBranch,
+                    "Expected branch type for snapshot ref");
+      ICEBERG_ASSIGN_OR_RAISE(auto branch,
+                              SnapshotRef::MakeBranch(snapshot_id, 
min_snapshots,
+                                                      max_snapshot_age, 
max_ref_age));
+      return std::make_unique<table::SetSnapshotRef>(std::move(ref_name), 
*branch);
+    }
   }
   if (action == kActionSetProperties) {
     using StringMap = std::unordered_map<std::string, std::string>;
diff --git a/src/iceberg/manifest/manifest_writer.cc 
b/src/iceberg/manifest/manifest_writer.cc
index 0899869a..0045e2c0 100644
--- a/src/iceberg/manifest/manifest_writer.cc
+++ b/src/iceberg/manifest/manifest_writer.cc
@@ -369,23 +369,18 @@ Result<std::unique_ptr<ManifestWriter>> 
ManifestWriter::MakeWriter(
     int8_t format_version, std::optional<int64_t> snapshot_id,
     std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
     std::shared_ptr<PartitionSpec> partition_spec, std::shared_ptr<Schema> 
current_schema,
-    std::optional<ManifestContent> content, std::optional<int64_t> 
first_row_id) {
+    ManifestContent content, std::optional<int64_t> first_row_id) {
   switch (format_version) {
     case 1:
       return MakeV1Writer(snapshot_id, manifest_location, std::move(file_io),
                           std::move(partition_spec), 
std::move(current_schema));
     case 2:
-      ICEBERG_PRECHECK(content.has_value(),
-                       "ManifestContent is required for format version 2");
       return MakeV2Writer(snapshot_id, manifest_location, std::move(file_io),
-                          std::move(partition_spec), std::move(current_schema),
-                          content.value());
+                          std::move(partition_spec), 
std::move(current_schema), content);
     case 3:
-      ICEBERG_PRECHECK(content.has_value(),
-                       "ManifestContent is required for format version 3");
       return MakeV3Writer(snapshot_id, first_row_id, manifest_location,
                           std::move(file_io), std::move(partition_spec),
-                          std::move(current_schema), content.value());
+                          std::move(current_schema), content);
     default:
       return NotSupported("Format version {} is not supported", 
format_version);
   }
diff --git a/src/iceberg/manifest/manifest_writer.h 
b/src/iceberg/manifest/manifest_writer.h
index 5a095b28..288bda31 100644
--- a/src/iceberg/manifest/manifest_writer.h
+++ b/src/iceberg/manifest/manifest_writer.h
@@ -28,6 +28,7 @@
 
 #include "iceberg/file_writer.h"
 #include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
 #include "iceberg/metrics.h"
 #include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
@@ -175,7 +176,7 @@ class ICEBERG_EXPORT ManifestWriter {
       std::string_view manifest_location, std::shared_ptr<FileIO> file_io,
       std::shared_ptr<PartitionSpec> partition_spec,
       std::shared_ptr<Schema> current_schema,
-      std::optional<ManifestContent> content = std::nullopt,
+      ManifestContent content = ManifestContent::kData,
       std::optional<int64_t> first_row_id = std::nullopt);
 
  private:
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 3164f390..ead2ef2c 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -103,6 +103,7 @@ iceberg_sources = files(
     'transform_function.cc',
     'type.cc',
     'update/pending_update.cc',
+    'update/snapshot_update.cc',
     'update/update_partition_spec.cc',
     'update/update_properties.cc',
     'update/update_schema.cc',
diff --git a/src/iceberg/snapshot.cc b/src/iceberg/snapshot.cc
index ba85e049..dfa9a340 100644
--- a/src/iceberg/snapshot.cc
+++ b/src/iceberg/snapshot.cc
@@ -19,10 +19,13 @@
 
 #include "iceberg/snapshot.h"
 
+#include <memory>
+
 #include "iceberg/file_io.h"
 #include "iceberg/manifest/manifest_list.h"
 #include "iceberg/manifest/manifest_reader.h"
 #include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
 
 namespace iceberg {
 
@@ -49,6 +52,55 @@ SnapshotRefType SnapshotRef::type() const noexcept {
       retention);
 }
 
+Status SnapshotRef::Validate() const {
+  if (type() == SnapshotRefType::kBranch) {
+    const auto& branch = std::get<Branch>(this->retention);
+    ICEBERG_CHECK(!branch.min_snapshots_to_keep.has_value() ||
+                      branch.min_snapshots_to_keep.value() > 0,
+                  "Min snapshots to keep must be greater than 0");
+    ICEBERG_CHECK(
+        !branch.max_snapshot_age_ms.has_value() || 
branch.max_snapshot_age_ms.value() > 0,
+        "Max snapshot age must be greater than 0 ms");
+    ICEBERG_CHECK(!branch.max_ref_age_ms.has_value() || 
branch.max_ref_age_ms.value() > 0,
+                  "Max reference age must be greater than 0");
+  } else {
+    const auto& tag = std::get<Tag>(this->retention);
+    ICEBERG_CHECK(!tag.max_ref_age_ms.has_value() || 
tag.max_ref_age_ms.value() > 0,
+                  "Max reference age must be greater than 0");
+  }
+  return {};
+}
+
+Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeBranch(
+    int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep,
+    std::optional<int64_t> max_snapshot_age_ms, std::optional<int64_t> 
max_ref_age_ms) {
+  auto ref = std::make_unique<SnapshotRef>(
+      SnapshotRef{.snapshot_id = snapshot_id,
+                  .retention = Branch{
+                      .min_snapshots_to_keep = min_snapshots_to_keep,
+                      .max_snapshot_age_ms = max_snapshot_age_ms,
+                      .max_ref_age_ms = max_ref_age_ms,
+                  }});
+  ICEBERG_RETURN_UNEXPECTED(ref->Validate());
+  return ref;
+}
+
+Result<std::unique_ptr<SnapshotRef>> SnapshotRef::MakeTag(
+    int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms) {
+  auto ref = std::make_unique<SnapshotRef>(SnapshotRef{
+      .snapshot_id = snapshot_id, .retention = Tag{.max_ref_age_ms = 
max_ref_age_ms}});
+  ICEBERG_RETURN_UNEXPECTED(ref->Validate());
+  return ref;
+}
+
+std::unique_ptr<SnapshotRef> SnapshotRef::Clone(
+    std::optional<int64_t> new_snapshot_id) const {
+  auto ref = std::make_unique<SnapshotRef>();
+  ref->snapshot_id = new_snapshot_id.value_or(snapshot_id);
+  ref->retention = retention;
+  return ref;
+}
+
 bool SnapshotRef::Equals(const SnapshotRef& other) const {
   if (this == &other) {
     return true;
@@ -67,7 +119,7 @@ bool SnapshotRef::Equals(const SnapshotRef& other) const {
   }
 }
 
-std::optional<std::string_view> Snapshot::operation() const {
+std::optional<std::string_view> Snapshot::Operation() const {
   auto it = summary.find(SnapshotSummaryFields::kOperation);
   if (it != summary.end()) {
     return it->second;
@@ -75,6 +127,24 @@ std::optional<std::string_view> Snapshot::operation() const 
{
   return std::nullopt;
 }
 
+Result<std::optional<int64_t>> Snapshot::FirstRowId() const {
+  auto it = summary.find(SnapshotSummaryFields::kFirstRowId);
+  if (it == summary.end()) {
+    return std::nullopt;
+  }
+
+  return StringUtils::ParseInt<int64_t>(it->second);
+}
+
+Result<std::optional<int64_t>> Snapshot::AddedRows() const {
+  auto it = summary.find(SnapshotSummaryFields::kAddedRows);
+  if (it == summary.end()) {
+    return std::nullopt;
+  }
+
+  return StringUtils::ParseInt<int64_t>(it->second);
+}
+
 bool Snapshot::Equals(const Snapshot& other) const {
   if (this == &other) {
     return true;
@@ -85,6 +155,37 @@ bool Snapshot::Equals(const Snapshot& other) const {
          schema_id == other.schema_id;
 }
 
+Result<std::unique_ptr<Snapshot>> Snapshot::Make(
+    int64_t sequence_number, int64_t snapshot_id,
+    std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
+    std::string operation, std::unordered_map<std::string, std::string> 
summary,
+    std::optional<int32_t> schema_id, std::string manifest_list,
+    std::optional<int64_t> first_row_id, std::optional<int64_t> added_rows) {
+  ICEBERG_PRECHECK(!operation.empty(), "Operation cannot be empty");
+  ICEBERG_PRECHECK(!first_row_id.has_value() || first_row_id.value() >= 0,
+                   "Invalid first-row-id (cannot be negative): {}", 
first_row_id.value());
+  ICEBERG_PRECHECK(!added_rows.has_value() || added_rows.value() >= 0,
+                   "Invalid added-rows (cannot be negative): {}", 
added_rows.value());
+  ICEBERG_PRECHECK(!first_row_id.has_value() || added_rows.has_value(),
+                   "Missing added-rows when first-row-id is set");
+  summary[SnapshotSummaryFields::kOperation] = operation;
+  if (first_row_id.has_value()) {
+    summary[SnapshotSummaryFields::kFirstRowId] = 
std::to_string(first_row_id.value());
+  }
+  if (added_rows.has_value()) {
+    summary[SnapshotSummaryFields::kAddedRows] = 
std::to_string(added_rows.value());
+  }
+  return std::make_unique<Snapshot>(Snapshot{
+      .snapshot_id = snapshot_id,
+      .parent_snapshot_id = parent_snapshot_id,
+      .sequence_number = sequence_number,
+      .timestamp_ms = timestamp_ms,
+      .manifest_list = std::move(manifest_list),
+      .summary = std::move(summary),
+      .schema_id = schema_id,
+  });
+}
+
 Result<SnapshotCache::ManifestsCache> SnapshotCache::InitManifestsCache(
     const Snapshot* snapshot, std::shared_ptr<FileIO> file_io) {
   if (file_io == nullptr) {
diff --git a/src/iceberg/snapshot.h b/src/iceberg/snapshot.h
index 86a7fe20..e2ec0ccb 100644
--- a/src/iceberg/snapshot.h
+++ b/src/iceberg/snapshot.h
@@ -25,7 +25,6 @@
 #include <string>
 #include <string_view>
 #include <unordered_map>
-#include <utility>
 #include <variant>
 
 #include "iceberg/iceberg_export.h"
@@ -114,6 +113,39 @@ struct ICEBERG_EXPORT SnapshotRef {
 
   SnapshotRefType type() const noexcept;
 
+  /// \brief Create a branch reference
+  ///
+  /// \param snapshot_id The snapshot ID for the branch
+  /// \param min_snapshots_to_keep Optional minimum number of snapshots to keep
+  /// \param max_snapshot_age_ms Optional maximum snapshot age in milliseconds
+  /// \param max_ref_age_ms Optional maximum reference age in milliseconds
+  /// \return A Result containing a unique_ptr to the SnapshotRef, or an error 
if
+  /// validation failed
+  static Result<std::unique_ptr<SnapshotRef>> MakeBranch(
+      int64_t snapshot_id, std::optional<int32_t> min_snapshots_to_keep = 
std::nullopt,
+      std::optional<int64_t> max_snapshot_age_ms = std::nullopt,
+      std::optional<int64_t> max_ref_age_ms = std::nullopt);
+
+  /// \brief Create a tag reference
+  ///
+  /// \param snapshot_id The snapshot ID for the tag
+  /// \param max_ref_age_ms Optional maximum reference age in milliseconds
+  /// \return A Result containing a unique_ptr to the SnapshotRef, or an error 
if
+  /// validation failed
+  static Result<std::unique_ptr<SnapshotRef>> MakeTag(
+      int64_t snapshot_id, std::optional<int64_t> max_ref_age_ms = 
std::nullopt);
+
+  /// \brief Clone this SnapshotRef with an optional new snapshot ID
+  ///
+  /// \param new_snapshot_id Optional new snapshot ID. If not provided, uses 
the current
+  /// snapshot_id
+  /// \return A unique_ptr to the cloned SnapshotRef
+  std::unique_ptr<SnapshotRef> Clone(
+      std::optional<int64_t> new_snapshot_id = std::nullopt) const;
+
+  /// \brief Validate the SnapshotRef
+  Status Validate() const;
+
   /// \brief Compare two snapshot refs for equality
   friend bool operator==(const SnapshotRef& lhs, const SnapshotRef& rhs) {
     return lhs.Equals(rhs);
@@ -125,9 +157,13 @@ struct ICEBERG_EXPORT SnapshotRef {
 };
 
 /// \brief Optional Snapshot Summary Fields
-struct SnapshotSummaryFields {
+struct ICEBERG_EXPORT SnapshotSummaryFields {
   /// \brief The operation field key
   inline static const std::string kOperation = "operation";
+  /// \brief The first row id field key
+  inline static const std::string kFirstRowId = "first-row-id";
+  /// \brief The added rows field key
+  inline static const std::string kAddedRows = "added-rows";
 
   /// Metrics, see https://iceberg.apache.org/spec/#metrics
 
@@ -246,12 +282,44 @@ struct ICEBERG_EXPORT Snapshot {
   /// ID of the table's current schema when the snapshot was created.
   std::optional<int32_t> schema_id;
 
+  /// \brief Create a new Snapshot instance with validation on the inputs.
+  static Result<std::unique_ptr<Snapshot>> Make(
+      int64_t sequence_number, int64_t snapshot_id,
+      std::optional<int64_t> parent_snapshot_id, TimePointMs timestamp_ms,
+      std::string operation, std::unordered_map<std::string, std::string> 
summary,
+      std::optional<int32_t> schema_id, std::string manifest_list,
+      std::optional<int64_t> first_row_id = std::nullopt,
+      std::optional<int64_t> added_rows = std::nullopt);
+
   /// \brief Return the name of the DataOperations data operation that 
produced this
   /// snapshot.
   ///
   /// \return the operation that produced this snapshot, or nullopt if the 
operation is
   /// unknown.
-  std::optional<std::string_view> operation() const;
+  std::optional<std::string_view> Operation() const;
+
+  /// \brief The row-id of the first newly added row in this snapshot.
+  ///
+  /// All rows added in this snapshot will have a row-id assigned to them 
greater than
+  /// this value. All rows with a row-id less than this value were created in 
a snapshot
+  /// that was added to the table (but not necessarily committed to this 
branch) in the
+  /// past.
+  ///
+  /// \return the first row-id to be used in this snapshot or nullopt when row 
lineage
+  /// is not supported
+  Result<std::optional<int64_t>> FirstRowId() const;
+
+  /// \brief The upper bound of number of rows with assigned row IDs in this 
snapshot.
+  ///
+  /// It can be used safely to increment the table's `next-row-id` during a 
commit. It
+  /// can be more than the number of rows added in this snapshot and include 
some
+  /// existing rows.
+  ///
+  /// This field is optional but is required when the table version supports 
row lineage.
+  ///
+  /// \return the upper bound of number of rows with assigned row IDs in this 
snapshot
+  /// or nullopt if the value was not stored.
+  Result<std::optional<int64_t>> AddedRows() const;
 
   /// \brief Compare two snapshots for equality.
   friend bool operator==(const Snapshot& lhs, const Snapshot& rhs) {
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 31139585..77fe763f 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -50,7 +50,7 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
 
   virtual ~Table();
 
-  /// \brief Return the identifier of this table
+  /// \brief Returns the identifier of this table
   const TableIdentifier& name() const { return identifier_; }
 
   /// \brief Returns the UUID of the table
@@ -59,40 +59,40 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
   /// \brief Return the schema for this table, return NotFoundError if not 
found
   Result<std::shared_ptr<Schema>> schema() const;
 
-  /// \brief Return a map of schema for this table
+  /// \brief Returns a map of schema for this table
   Result<
       std::reference_wrapper<const std::unordered_map<int32_t, 
std::shared_ptr<Schema>>>>
   schemas() const;
 
-  /// \brief Return the partition spec for this table, return NotFoundError if 
not found
+  /// \brief Returns the partition spec for this table, return NotFoundError 
if not found
   Result<std::shared_ptr<PartitionSpec>> spec() const;
 
-  /// \brief Return a map of partition specs for this table
+  /// \brief Returns a map of partition specs for this table
   Result<std::reference_wrapper<
       const std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>>>>
   specs() const;
 
-  /// \brief Return the sort order for this table, return NotFoundError if not 
found
+  /// \brief Returns the sort order for this table, return NotFoundError if 
not found
   Result<std::shared_ptr<SortOrder>> sort_order() const;
 
-  /// \brief Return a map of sort order IDs to sort orders for this table
+  /// \brief Returns a map of sort order IDs to sort orders for this table
   Result<std::reference_wrapper<
       const std::unordered_map<int32_t, std::shared_ptr<SortOrder>>>>
   sort_orders() const;
 
-  /// \brief Return a map of string properties for this table
+  /// \brief Returns the properties of this table
   const TableProperties& properties() const;
 
-  /// \brief Return the table's metadata file location
+  /// \brief Returns the table's metadata file location
   std::string_view metadata_file_location() const;
 
-  /// \brief Return the table's base location
+  /// \brief Returns the table's base location
   std::string_view location() const;
 
   /// \brief Returns the time when this table was last updated
   TimePointMs last_updated_ms() const;
 
-  /// \brief Return the table's current snapshot, return NotFoundError if not 
found
+  /// \brief Returns the table's current snapshot, return NotFoundError if not 
found
   Result<std::shared_ptr<Snapshot>> current_snapshot() const;
 
   /// \brief Get the snapshot of this table with the given id
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 851048b3..22eb739b 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -34,6 +34,7 @@
 
 #include <nlohmann/json.hpp>
 
+#include "iceberg/constants.h"
 #include "iceberg/exception.h"
 #include "iceberg/file_io.h"
 #include "iceberg/json_internal.h"
@@ -52,6 +53,7 @@
 #include "iceberg/util/location_util.h"
 #include "iceberg/util/macros.h"
 #include "iceberg/util/property_util.h"
+#include "iceberg/util/timepoint.h"
 #include "iceberg/util/type_util.h"
 #include "iceberg/util/uuid.h"
 
@@ -244,26 +246,38 @@ Result<std::shared_ptr<Schema>> 
TableMetadata::SchemaById(int32_t schema_id) con
 }
 
 Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
-  auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) {
-    return spec != nullptr && spec->spec_id() == default_spec_id;
+  return PartitionSpecById(default_spec_id);
+}
+
+Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpecById(
+    int32_t spec_id) const {
+  auto iter = std::ranges::find_if(partition_specs, [spec_id](const auto& 
spec) {
+    return spec != nullptr && spec->spec_id() == spec_id;
   });
   if (iter == partition_specs.end()) {
-    return NotFound("Default partition spec is not found");
+    return NotFound("Partition spec with ID {} is not found", spec_id);
   }
   return *iter;
 }
 
 Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
-  auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) {
-    return order != nullptr && order->order_id() == default_sort_order_id;
+  return SortOrderById(default_sort_order_id);
+}
+
+Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrderById(int32_t 
order_id) const {
+  auto iter = std::ranges::find_if(sort_orders, [order_id](const auto& order) {
+    return order != nullptr && order->order_id() == order_id;
   });
   if (iter == sort_orders.end()) {
-    return NotFound("Default sort order is not found");
+    return NotFound("Sort order with ID {} is not found", order_id);
   }
   return *iter;
 }
 
 Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot() const {
+  if (current_snapshot_id == kInvalidSnapshotId) {
+    return NotFound("No current snapshot");
+  }
   return SnapshotById(current_snapshot_id);
 }
 
@@ -277,6 +291,10 @@ Result<std::shared_ptr<Snapshot>> 
TableMetadata::SnapshotById(int64_t snapshot_i
   return *iter;
 }
 
+int64_t TableMetadata::NextSequenceNumber() const {
+  return format_version > 1 ? last_sequence_number + 1 : 
kInitialSequenceNumber;
+}
+
 namespace {
 
 template <typename T>
@@ -555,6 +573,10 @@ class TableMetadataBuilder::Impl {
       sort_orders_by_id_.emplace(order->order_id(), order);
     }
 
+    for (const auto& snapshot : metadata_.snapshots) {
+      snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot);
+    }
+
     metadata_.last_updated_ms = kInvalidLastUpdatedMs;
   }
 
@@ -591,6 +613,10 @@ class TableMetadataBuilder::Impl {
   Status RemoveSchemas(const std::unordered_set<int32_t>& schema_ids);
   Result<int32_t> AddSchema(const Schema& schema, int32_t new_last_column_id);
   void SetLocation(std::string_view location);
+  Status AddSnapshot(std::shared_ptr<Snapshot> snapshot);
+  Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch);
+  Status SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot, const 
std::string& branch);
+  Status SetRef(const std::string& name, std::shared_ptr<SnapshotRef> ref);
 
   Result<std::unique_ptr<TableMetadata>> Build();
 
@@ -613,6 +639,34 @@ class TableMetadataBuilder::Impl {
   /// \return The ID to use for this schema (reused if exists, new otherwise
   int32_t ReuseOrCreateNewSchemaId(const Schema& new_schema) const;
 
+  /// \brief Finds intermediate snapshots that have not been committed as the 
current
+  /// snapshot.
+  ///
+  /// Transactions can create snapshots that are never the current snapshot 
because
+  /// several changes are combined by the transaction into one table metadata 
update. When
+  /// each intermediate snapshot is added to table metadata, it is added to 
the snapshot
+  /// log, assuming that it will be the current snapshot. When there are 
multiple snapshot
+  /// updates, the log must be corrected by suppressing the intermediate 
snapshot entries.
+  ///
+  /// A snapshot is an intermediate snapshot if it was added but is not the 
current
+  /// snapshot.
+  ///
+  /// \param current_snapshot_id The current snapshot ID
+  /// \return A set of snapshot IDs for all added snapshots that were later 
replaced as
+  /// the current snapshot in changes
+  std::unordered_set<int64_t> IntermediateSnapshotIdSet(
+      int64_t current_snapshot_id) const;
+
+  /// \brief Updates the snapshot log by removing intermediate snapshots and 
handling
+  /// removed snapshots.
+  ///
+  /// \param current_snapshot_id The current snapshot ID
+  /// \return Updated snapshot log or error
+  Result<std::vector<SnapshotLogEntry>> UpdateSnapshotLog(
+      int64_t current_snapshot_id) const;
+
+  Status SetBranchSnapshotInternal(const Snapshot& snapshot, const 
std::string& branch);
+
  private:
   // Base metadata (nullptr for new tables)
   const TableMetadata* base_;
@@ -634,6 +688,7 @@ class TableMetadataBuilder::Impl {
   std::unordered_map<int32_t, std::shared_ptr<Schema>> schemas_by_id_;
   std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> specs_by_id_;
   std::unordered_map<int32_t, std::shared_ptr<SortOrder>> sort_orders_by_id_;
+  std::unordered_map<int64_t, std::shared_ptr<Snapshot>> snapshots_by_id_;
 };
 
 Status TableMetadataBuilder::Impl::AssignUUID(std::string_view uuid) {
@@ -982,6 +1037,206 @@ void 
TableMetadataBuilder::Impl::SetLocation(std::string_view location) {
   
changes_.push_back(std::make_unique<table::SetLocation>(std::string(location)));
 }
 
+Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr<Snapshot> 
snapshot) {
+  if (snapshot == nullptr) {
+    // change is a noop
+    return {};
+  }
+  ICEBERG_CHECK(!metadata_.schemas.empty(),
+                "Attempting to add a snapshot before a schema is added");
+  ICEBERG_CHECK(!metadata_.partition_specs.empty(),
+                "Attempting to add a snapshot before a partition spec is 
added");
+  ICEBERG_CHECK(!metadata_.sort_orders.empty(),
+                "Attempting to add a snapshot before a sort order is added");
+  ICEBERG_CHECK(!snapshots_by_id_.contains(snapshot->snapshot_id),
+                "Snapshot already exists for id: {}", snapshot->snapshot_id);
+  ICEBERG_CHECK(
+      metadata_.format_version == 1 ||
+          snapshot->sequence_number > metadata_.last_sequence_number ||
+          !snapshot->parent_snapshot_id.has_value(),
+      "Cannot add snapshot with sequence number {} older than last sequence 
number {}",
+      snapshot->sequence_number, metadata_.last_sequence_number);
+
+  metadata_.last_updated_ms = snapshot->timestamp_ms;
+  metadata_.last_sequence_number = snapshot->sequence_number;
+  metadata_.snapshots.push_back(snapshot);
+  snapshots_by_id_.emplace(snapshot->snapshot_id, snapshot);
+  changes_.push_back(std::make_unique<table::AddSnapshot>(snapshot));
+
+  if (metadata_.format_version >= TableMetadata::kMinFormatVersionRowLineage) {
+    ICEBERG_ASSIGN_OR_RAISE(auto first_row_id, snapshot->FirstRowId());
+    ICEBERG_CHECK(first_row_id.has_value(),
+                  "Cannot add a snapshot: first-row-id is null");
+    ICEBERG_CHECK(
+        first_row_id.value() >= metadata_.next_row_id,
+        "Cannot add a snapshot, first-row-id is behind table next-row-id: {} < 
{}",
+        first_row_id.value(), metadata_.next_row_id);
+
+    ICEBERG_ASSIGN_OR_RAISE(auto add_rows, snapshot->AddedRows());
+    ICEBERG_CHECK(add_rows.has_value(), "Cannot add a snapshot: added-rows is 
null");
+    metadata_.next_row_id += add_rows.value();
+  }
+
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id,
+                                                     const std::string& 
branch) {
+  auto ref_it = metadata_.refs.find(branch);
+  if (ref_it != metadata_.refs.end() && ref_it->second->snapshot_id == 
snapshot_id) {
+    // change is a noop
+    return {};
+  }
+
+  auto snapshot_it = snapshots_by_id_.find(snapshot_id);
+  ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
+                "Cannot set {} to unknown snapshot: {}", branch, snapshot_id);
+  return SetBranchSnapshotInternal(*snapshot_it->second, branch);
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr<Snapshot> 
snapshot,
+                                                     const std::string& 
branch) {
+  if (snapshot == nullptr) {
+    // change is a noop
+    return {};
+  }
+  const Snapshot& snapshot_ref = *snapshot;
+  ICEBERG_RETURN_UNEXPECTED(AddSnapshot(std::move(snapshot)));
+  return SetBranchSnapshotInternal(snapshot_ref, branch);
+}
+
+Status TableMetadataBuilder::Impl::SetBranchSnapshotInternal(const Snapshot& 
snapshot,
+                                                             const 
std::string& branch) {
+  const int64_t replacement_snapshot_id = snapshot.snapshot_id;
+  auto ref_it = metadata_.refs.find(branch);
+  if (ref_it != metadata_.refs.end()) {
+    ICEBERG_CHECK(ref_it->second->type() == SnapshotRefType::kBranch,
+                  "Cannot update branch: {} is a tag", branch);
+    if (ref_it->second->snapshot_id == replacement_snapshot_id) {
+      return {};
+    }
+  }
+
+  ICEBERG_CHECK(
+      metadata_.format_version == 1 ||
+          snapshot.sequence_number <= metadata_.last_sequence_number,
+      "Last sequence number {} is less than existing snapshot sequence number 
{}",
+      metadata_.last_sequence_number, snapshot.sequence_number);
+
+  std::shared_ptr<SnapshotRef> new_ref;
+  if (ref_it != metadata_.refs.end()) {
+    new_ref = ref_it->second->Clone(replacement_snapshot_id);
+  } else {
+    ICEBERG_ASSIGN_OR_RAISE(new_ref, 
SnapshotRef::MakeBranch(replacement_snapshot_id));
+  }
+
+  return SetRef(branch, std::move(new_ref));
+}
+
+Status TableMetadataBuilder::Impl::SetRef(const std::string& name,
+                                          std::shared_ptr<SnapshotRef> ref) {
+  auto existing_ref_it = metadata_.refs.find(name);
+  if (existing_ref_it != metadata_.refs.end() && *existing_ref_it->second == 
*ref) {
+    return {};
+  }
+
+  int64_t snapshot_id = ref->snapshot_id;
+  auto snapshot_it = snapshots_by_id_.find(snapshot_id);
+  ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(),
+                "Cannot set {} to unknown snapshot: {}", name, snapshot_id);
+  const auto& snapshot = snapshot_it->second;
+
+  // If snapshot was added in this set of changes, update last_updated_ms
+  if (std::ranges::any_of(changes_, [snapshot_id](const auto& change) {
+        return change->kind() == TableUpdate::Kind::kAddSnapshot &&
+               internal::checked_cast<const table::AddSnapshot&>(*change)
+                       .snapshot()
+                       ->snapshot_id == snapshot_id;
+      })) {
+    metadata_.last_updated_ms = snapshot->timestamp_ms;
+  }
+
+  if (name == SnapshotRef::kMainBranch) {
+    metadata_.current_snapshot_id = ref->snapshot_id;
+    if (metadata_.last_updated_ms == kInvalidLastUpdatedMs) {
+      metadata_.last_updated_ms = CurrentTimePointMs();
+    }
+    metadata_.snapshot_log.emplace_back(metadata_.last_updated_ms, 
ref->snapshot_id);
+  }
+
+  changes_.push_back(std::make_unique<table::SetSnapshotRef>(name, *ref));
+  metadata_.refs[name] = std::move(ref);
+
+  return {};
+}
+
+std::unordered_set<int64_t> 
TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
+    int64_t current_snapshot_id) const {
+  std::unordered_set<int64_t> added_snapshot_ids;
+  std::unordered_set<int64_t> intermediate_snapshot_ids;
+
+  std::ranges::for_each(changes_, [&](const auto& change) {
+    if (change->kind() == TableUpdate::Kind::kAddSnapshot) {
+      // Adds must always come before set current snapshot
+      const auto& added_snapshot =
+          internal::checked_cast<const table::AddSnapshot&>(*change);
+      added_snapshot_ids.insert(added_snapshot.snapshot()->snapshot_id);
+    } else if (change->kind() == TableUpdate::Kind::kSetSnapshotRef) {
+      const auto& set_ref = internal::checked_cast<const 
table::SetSnapshotRef&>(*change);
+      int64_t snapshot_id = set_ref.snapshot_id();
+      if (added_snapshot_ids.contains(snapshot_id) &&
+          set_ref.ref_name() == SnapshotRef::kMainBranch &&
+          snapshot_id != current_snapshot_id) {
+        intermediate_snapshot_ids.insert(snapshot_id);
+      }
+    }
+  });
+
+  return intermediate_snapshot_ids;
+}
+
+Result<std::vector<SnapshotLogEntry>> 
TableMetadataBuilder::Impl::UpdateSnapshotLog(
+    int64_t current_snapshot_id) const {
+  std::unordered_set<int64_t> intermediate_snapshot_ids =
+      IntermediateSnapshotIdSet(current_snapshot_id);
+  const bool has_removed_snapshots =
+      std::ranges::any_of(changes_, [](const auto& change) {
+        return change->kind() == TableUpdate::Kind::kRemoveSnapshots;
+      });
+  if (intermediate_snapshot_ids.empty() && !has_removed_snapshots) {
+    return metadata_.snapshot_log;
+  }
+
+  // Update the snapshot log
+  std::vector<SnapshotLogEntry> new_snapshot_log;
+  for (const auto& log_entry : metadata_.snapshot_log) {
+    int64_t snapshot_id = log_entry.snapshot_id;
+    if (snapshots_by_id_.contains(snapshot_id)) {
+      if (!intermediate_snapshot_ids.contains(snapshot_id)) {
+        // Copy the log entries that are still valid
+        new_snapshot_log.push_back(log_entry);
+      }
+    } else if (has_removed_snapshots) {
+      // Any invalid entry causes the history before it to be removed. 
Otherwise, there
+      // could be history gaps that cause time-travel queries to produce 
incorrect
+      // results. For example, if history is [(t1, s1), (t2, s2), (t3, s3)] 
and s2 is
+      // removed, the history cannot be [(t1, s1), (t3, s3)] because it 
appears that s3
+      // was current during the time between t2 and t3 when in fact s2 was the 
current
+      // snapshot.
+      new_snapshot_log.clear();
+    }
+  }
+
+  if (snapshots_by_id_.contains(current_snapshot_id)) {
+    ICEBERG_CHECK(
+        !new_snapshot_log.empty() &&
+            new_snapshot_log.back().snapshot_id == current_snapshot_id,
+        "Cannot set invalid snapshot log: latest entry is not the current 
snapshot");
+  }
+
+  return new_snapshot_log;
+}
+
 Result<std::unique_ptr<TableMetadata>> TableMetadataBuilder::Impl::Build() {
   // 1. Validate metadata consistency through TableMetadata#Validate
 
@@ -1025,7 +1280,9 @@ Result<std::unique_ptr<TableMetadata>> 
TableMetadataBuilder::Impl::Build() {
                                  metadata_.metadata_log.end() - 
max_metadata_log_size);
   }
 
-  // TODO(anyone): 4. update snapshot_log
+  // 4. Update snapshot_log
+  ICEBERG_ASSIGN_OR_RAISE(metadata_.snapshot_log,
+                          UpdateSnapshotLog(metadata_.current_snapshot_id));
 
   // 5. Create and return the TableMetadata
   return std::make_unique<TableMetadata>(std::move(metadata_));
@@ -1207,17 +1464,26 @@ TableMetadataBuilder& 
TableMetadataBuilder::AddSortOrder(
 
 TableMetadataBuilder& TableMetadataBuilder::AddSnapshot(
     std::shared_ptr<Snapshot> snapshot) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->AddSnapshot(snapshot));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(int64_t 
snapshot_id,
                                                               const 
std::string& branch) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(snapshot_id, 
branch));
+  return *this;
+}
+
+TableMetadataBuilder& TableMetadataBuilder::SetBranchSnapshot(
+    std::shared_ptr<Snapshot> snapshot, const std::string& branch) {
+  
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetBranchSnapshot(std::move(snapshot), 
branch));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetRef(const std::string& name,
                                                    
std::shared_ptr<SnapshotRef> ref) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetRef(name, std::move(ref)));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemoveRef(const std::string& name) 
{
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index 3e3eb9c7..a4165b81 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -134,17 +134,32 @@ struct ICEBERG_EXPORT TableMetadata {
       int format_version = kDefaultTableFormatVersion);
 
   /// \brief Get the current schema, return NotFoundError if not found
+  /// \note The returned schema is guaranteed to be not null
   Result<std::shared_ptr<iceberg::Schema>> Schema() const;
   /// \brief Get the current schema by ID, return NotFoundError if not found
+  /// \note The returned schema is guaranteed to be not null
   Result<std::shared_ptr<iceberg::Schema>> SchemaById(int32_t schema_id) const;
   /// \brief Get the current partition spec, return NotFoundError if not found
+  /// \note The returned partition spec is guaranteed to be not null
   Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
+  /// \brief Get the current partition spec by ID, return NotFoundError if not 
found
+  /// \note The returned partition spec is guaranteed to be not null
+  Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpecById(
+      int32_t spec_id) const;
   /// \brief Get the current sort order, return NotFoundError if not found
+  /// \note The returned sort order is guaranteed to be not null
   Result<std::shared_ptr<iceberg::SortOrder>> SortOrder() const;
+  /// \brief Get the current sort order by ID, return NotFoundError if not 
found
+  /// \note The returned sort order is guaranteed to be not null
+  Result<std::shared_ptr<iceberg::SortOrder>> SortOrderById(int32_t 
sort_order_id) const;
   /// \brief Get the current snapshot, return NotFoundError if not found
+  /// \note The returned snapshot is guaranteed to be not null
   Result<std::shared_ptr<iceberg::Snapshot>> Snapshot() const;
-  /// \brief Get the snapshot of this table with the given id
+  /// \brief Get the snapshot by ID, return NotFoundError if not found
+  /// \note The returned snapshot is guaranteed to be not null
   Result<std::shared_ptr<iceberg::Snapshot>> SnapshotById(int64_t snapshot_id) 
const;
+  /// \brief Get the next sequence number
+  int64_t NextSequenceNumber() const;
 
   ICEBERG_EXPORT friend bool operator==(const TableMetadata& lhs,
                                         const TableMetadata& rhs);
@@ -337,6 +352,14 @@ class ICEBERG_EXPORT TableMetadataBuilder : public 
ErrorCollector {
   /// \return Reference to this builder for method chaining
   TableMetadataBuilder& SetBranchSnapshot(int64_t snapshot_id, const 
std::string& branch);
 
+  /// \brief Set a branch to point to a specific snapshot
+  ///
+  /// \param snapshot The snapshot the branch should reference
+  /// \param branch The name of the branch
+  /// \return Reference to this builder for method chaining
+  TableMetadataBuilder& SetBranchSnapshot(std::shared_ptr<Snapshot> snapshot,
+                                          const std::string& branch);
+
   /// \brief Set a snapshot reference
   ///
   /// \param name The name of the reference
diff --git a/src/iceberg/table_properties.h b/src/iceberg/table_properties.h
index feb4a200..5d5c17db 100644
--- a/src/iceberg/table_properties.h
+++ b/src/iceberg/table_properties.h
@@ -20,7 +20,6 @@
 #pragma once
 
 #include <limits>
-#include <memory>
 #include <string>
 #include <unordered_map>
 #include <unordered_set>
@@ -244,6 +243,9 @@ class ICEBERG_EXPORT TableProperties : public 
ConfigBase<TableProperties> {
   inline static Entry<int64_t> kDeleteTargetFileSizeBytes{
       "write.delete.target-file-size-bytes", int64_t{64} * 1024 * 1024};  // 
64 MB
 
+  inline static Entry<bool> kSnapshotIdInheritanceEnabled{
+      "compatibility.snapshot-id-inheritance.enabled", false};
+
   // Garbage collection properties
 
   inline static Entry<bool> kGcEnabled{"gc.enabled", true};
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 38ce0fbc..29388d47 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -274,7 +274,7 @@ std::unique_ptr<TableUpdate> SetDefaultSortOrder::Clone() 
const {
 // AddSnapshot
 
 void AddSnapshot::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.AddSnapshot(snapshot_);
 }
 
 void AddSnapshot::GenerateRequirements(TableUpdateContext& context) const {
@@ -344,7 +344,7 @@ std::unique_ptr<TableUpdate> RemoveSnapshotRef::Clone() 
const {
 // SetSnapshotRef
 
 void SetSnapshotRef::ApplyTo(TableMetadataBuilder& builder) const {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  builder.SetBranchSnapshot(snapshot_id_, ref_name_);
 }
 
 void SetSnapshotRef::GenerateRequirements(TableUpdateContext& context) const {
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 87524319..3c9c9dbb 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -401,6 +401,19 @@ class ICEBERG_EXPORT SetSnapshotRef : public TableUpdate {
         max_snapshot_age_ms_(max_snapshot_age_ms),
         max_ref_age_ms_(max_ref_age_ms) {}
 
+  SetSnapshotRef(std::string ref_name, const SnapshotRef& ref)
+      : ref_name_(std::move(ref_name)), snapshot_id_(ref.snapshot_id), 
type_(ref.type()) {
+    if (type_ == SnapshotRefType::kBranch) {
+      const auto& branch = std::get<SnapshotRef::Branch>(ref.retention);
+      min_snapshots_to_keep_ = branch.min_snapshots_to_keep;
+      max_snapshot_age_ms_ = branch.max_snapshot_age_ms;
+      max_ref_age_ms_ = branch.max_ref_age_ms;
+    } else {
+      const auto& tag = std::get<SnapshotRef::Tag>(ref.retention);
+      max_ref_age_ms_ = tag.max_ref_age_ms;
+    }
+  }
+
   const std::string& ref_name() const { return ref_name_; }
   int64_t snapshot_id() const { return snapshot_id_; }
   SnapshotRefType type() const { return type_; }
diff --git a/src/iceberg/test/json_internal_test.cc 
b/src/iceberg/test/json_internal_test.cc
index f88527ff..33ed03b9 100644
--- a/src/iceberg/test/json_internal_test.cc
+++ b/src/iceberg/test/json_internal_test.cc
@@ -269,7 +269,7 @@ TEST(JsonInternalTest, 
SnapshotFromJsonSummaryWithNoOperation) {
   auto result = SnapshotFromJson(snapshot_json);
   ASSERT_TRUE(result.has_value());
 
-  ASSERT_EQ(result.value()->operation(), DataOperation::kOverwrite);
+  ASSERT_EQ(result.value()->Operation(), DataOperation::kOverwrite);
 }
 
 TEST(JsonInternalTest, NameMapping) {
diff --git a/src/iceberg/test/snapshot_test.cc 
b/src/iceberg/test/snapshot_test.cc
index a3c28f89..e7a657fb 100644
--- a/src/iceberg/test/snapshot_test.cc
+++ b/src/iceberg/test/snapshot_test.cc
@@ -96,7 +96,7 @@ TEST_F(SnapshotTest, ConstructionAndFieldAccess) {
   EXPECT_EQ(snapshot.sequence_number, 1);
   EXPECT_EQ(snapshot.timestamp_ms.time_since_epoch().count(), 1615569200000);
   EXPECT_EQ(snapshot.manifest_list, "s3://example/manifest_list.avro");
-  EXPECT_EQ(snapshot.operation().value(), DataOperation::kAppend);
+  EXPECT_EQ(snapshot.Operation(), std::make_optional(DataOperation::kAppend));
   
EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kAddedDataFiles)),
             "101");
   
EXPECT_EQ(snapshot.summary.at(std::string(SnapshotSummaryFields::kOperation)),
diff --git a/src/iceberg/test/table_requirements_test.cc 
b/src/iceberg/test/table_requirements_test.cc
index 80f83636..dd0aa8f6 100644
--- a/src/iceberg/test/table_requirements_test.cc
+++ b/src/iceberg/test/table_requirements_test.cc
@@ -883,12 +883,12 @@ TEST(TableRequirementsTest, SetSnapshotRef) {
 
   // Multiple updates to same ref should deduplicate
   std::vector<std::unique_ptr<TableUpdate>> updates;
-  updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, 
kSnapshotId,
-                                                            
SnapshotRefType::kBranch));
-  updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, 
kSnapshotId + 1,
-                                                            
SnapshotRefType::kBranch));
-  updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, 
kSnapshotId + 2,
-                                                            
SnapshotRefType::kBranch));
+  ICEBERG_UNWRAP_OR_FAIL(auto ref1, SnapshotRef::MakeBranch(kSnapshotId));
+  updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, *ref1));
+  ICEBERG_UNWRAP_OR_FAIL(auto ref2, SnapshotRef::MakeBranch(kSnapshotId + 1));
+  updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, *ref2));
+  ICEBERG_UNWRAP_OR_FAIL(auto ref3, SnapshotRef::MakeBranch(kSnapshotId + 2));
+  updates.push_back(std::make_unique<table::SetSnapshotRef>(kRefName, *ref3));
 
   auto result = TableRequirements::ForUpdateTable(*metadata, updates);
   ASSERT_THAT(result, IsOk());
diff --git a/src/iceberg/test/update_properties_test.cc 
b/src/iceberg/test/update_properties_test.cc
index 8ac8e5eb..59fa1d8d 100644
--- a/src/iceberg/test/update_properties_test.cc
+++ b/src/iceberg/test/update_properties_test.cc
@@ -107,7 +107,7 @@ TEST_F(UpdatePropertiesTest, 
UpgradeFormatVersionInvalidString) {
 
   auto result = update->Apply();
   EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument));
-  EXPECT_THAT(result, HasErrorMessage("Invalid format version"));
+  EXPECT_THAT(result, HasErrorMessage("Failed to parse integer from string"));
 }
 
 TEST_F(UpdatePropertiesTest, UpgradeFormatVersionOutOfRange) {
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 6641a1af..6ef942db 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -20,20 +20,24 @@
 #include "iceberg/transaction.h"
 
 #include <memory>
+#include <optional>
 
 #include "iceberg/catalog.h"
 #include "iceberg/schema.h"
 #include "iceberg/table.h"
 #include "iceberg/table_metadata.h"
+#include "iceberg/table_properties.h"
 #include "iceberg/table_requirement.h"
 #include "iceberg/table_requirements.h"
 #include "iceberg/table_update.h"
 #include "iceberg/update/pending_update.h"
+#include "iceberg/update/snapshot_update.h"
 #include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
 #include "iceberg/update/update_sort_order.h"
 #include "iceberg/util/checked_cast.h"
+#include "iceberg/util/location_util.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
@@ -69,6 +73,16 @@ const TableMetadata* Transaction::base() const { return 
metadata_builder_->base(
 
 const TableMetadata& Transaction::current() const { return 
metadata_builder_->current(); }
 
+std::string Transaction::MetadataFileLocation(std::string_view filename) const 
{
+  const auto metadata_location =
+      current().properties.Get(TableProperties::kWriteMetadataLocation);
+  if (metadata_location.empty()) {
+    return std::format("{}/{}", 
LocationUtil::StripTrailingSlash(metadata_location),
+                       filename);
+  }
+  return std::format("{}/metadata/{}", current().location, filename);
+}
+
 Status Transaction::AddUpdate(const std::shared_ptr<PendingUpdate>& update) {
   if (!last_update_committed_) {
     return InvalidArgument("Cannot add update when previous update is not 
committed");
@@ -113,6 +127,42 @@ Status Transaction::Apply(PendingUpdate& update) {
       metadata_builder_->SetCurrentSchema(std::move(result.schema),
                                           result.new_last_column_id);
     } break;
+    case PendingUpdate::Kind::kUpdateSnapshot: {
+      const auto& base = metadata_builder_->current();
+
+      auto& update_snapshot = internal::checked_cast<SnapshotUpdate&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto result, update_snapshot.Apply());
+
+      // Create a temp builder to check if this is an empty update
+      auto temp_update = TableMetadataBuilder::BuildFrom(&base);
+      if (base.SnapshotById(result.snapshot->snapshot_id).has_value()) {
+        // This is a rollback operation
+        temp_update->SetBranchSnapshot(result.snapshot->snapshot_id,
+                                       result.target_branch);
+      } else if (result.stage_only) {
+        temp_update->AddSnapshot(result.snapshot);
+      } else {
+        temp_update->SetBranchSnapshot(std::move(result.snapshot), 
result.target_branch);
+      }
+
+      if (temp_update->changes().empty()) {
+        // Do not commit if the metadata has not changed. for example, this 
may happen
+        // when setting the current snapshot to an ID that is already current. 
note that
+        // this check uses identity.
+        return {};
+      }
+
+      for (const auto& change : temp_update->changes()) {
+        change->ApplyTo(*metadata_builder_);
+      }
+
+      // If the table UUID is missing, add it here. the UUID will be 
re-created each time
+      // this operation retries to ensure that if a concurrent operation 
assigns the UUID,
+      // this operation will not fail.
+      if (base.table_uuid.empty()) {
+        metadata_builder_->AssignUUID();
+      }
+    } break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int32_t>(update.kind()));
@@ -155,12 +205,22 @@ Result<std::shared_ptr<Table>> Transaction::Commit() {
   }
 
   // XXX: we should handle commit failure and retry here.
-  ICEBERG_ASSIGN_OR_RAISE(auto updated_table, table_->catalog()->UpdateTable(
-                                                  table_->name(), 
requirements, updates));
+  auto commit_result =
+      table_->catalog()->UpdateTable(table_->name(), requirements, updates);
+
+  for (const auto& update : pending_updates_) {
+    if (auto update_ptr = update.lock()) {
+      std::ignore = update_ptr->Finalize(commit_result.has_value()
+                                             ? std::nullopt
+                                             : 
std::make_optional(commit_result.error()));
+    }
+  }
+
+  ICEBERG_RETURN_UNEXPECTED(commit_result);
 
   // Mark as committed and update table reference
   committed_ = true;
-  table_ = std::move(updated_table);
+  table_ = std::move(commit_result.value());
 
   return table_;
 }
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index ea918a17..3c2395c2 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -49,6 +49,12 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// \brief Return the current metadata with staged changes applied
   const TableMetadata& current() const;
 
+  /// \brief Return the location of the metadata file with the given filename
+  ///
+  /// \param filename the name of the metadata file
+  /// \return the location of the metadata file
+  std::string MetadataFileLocation(std::string_view filename) const;
+
   /// \brief Apply the pending changes from all actions and commit.
   ///
   /// \return Updated table if the transaction was committed successfully, or 
an error:
@@ -81,9 +87,9 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// \brief Apply the pending changes to current table.
   Status Apply(PendingUpdate& updates);
 
-  friend class PendingUpdate;  // Need to access the Apply method.
-
  private:
+  friend class PendingUpdate;
+
   // The table that this transaction will update.
   std::shared_ptr<Table> table_;
   // The kind of this transaction.
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 2daf39e6..ff49e1ed 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -188,6 +188,7 @@ class Transaction;
 
 /// \brief Update family.
 class PendingUpdate;
+class SnapshotUpdate;
 class UpdatePartitionSpec;
 class UpdateProperties;
 class UpdateSchema;
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index e4c786f4..4238e022 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -18,6 +18,7 @@
 install_headers(
     [
         'pending_update.h',
+        'snapshot_update.h',
         'update_partition_spec.h',
         'update_schema.h',
         'update_sort_order.h',
diff --git a/src/iceberg/update/pending_update.cc 
b/src/iceberg/update/pending_update.cc
index 535e7b41..e55a93df 100644
--- a/src/iceberg/update/pending_update.cc
+++ b/src/iceberg/update/pending_update.cc
@@ -30,4 +30,10 @@ PendingUpdate::~PendingUpdate() = default;
 
 Status PendingUpdate::Commit() { return transaction_->Apply(*this); }
 
+Status PendingUpdate::Finalize([[maybe_unused]] std::optional<Error> 
commit_error) {
+  return {};
+}
+
+const TableMetadata& PendingUpdate::base() const { return 
transaction_->current(); }
+
 }  // namespace iceberg
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index 90723987..2124d7e1 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -23,7 +23,7 @@
 /// API for table changes using builder pattern
 
 #include <memory>
-#include <vector>
+#include <optional>
 
 #include "iceberg/iceberg_export.h"
 #include "iceberg/result.h"
@@ -45,6 +45,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
     kUpdatePartitionSpec,
     kUpdateProperties,
     kUpdateSchema,
+    kUpdateSnapshot,
     kUpdateSortOrder,
   };
 
@@ -59,6 +60,15 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
   ///         - CommitStateUnknown: unknown status, no cleanup should be done.
   virtual Status Commit();
 
+  /// \brief Finalize the pending update.
+  ///
+  /// This method is called after the update is committed.
+  /// Implementations should override this method to clean up any resources.
+  ///
+  /// \param commit_error An optional error indicating whether the commit was 
successful
+  /// \return Status indicating success or failure
+  virtual Status Finalize(std::optional<Error> commit_error);
+
   // Non-copyable, movable
   PendingUpdate(const PendingUpdate&) = delete;
   PendingUpdate& operator=(const PendingUpdate&) = delete;
@@ -70,6 +80,8 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
  protected:
   explicit PendingUpdate(std::shared_ptr<Transaction> transaction);
 
+  const TableMetadata& base() const;
+
   std::shared_ptr<Transaction> transaction_;
 };
 
diff --git a/src/iceberg/update/snapshot_update.cc 
b/src/iceberg/update/snapshot_update.cc
new file mode 100644
index 00000000..2bbb2d50
--- /dev/null
+++ b/src/iceberg/update/snapshot_update.cc
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/snapshot_update.h"
+
+#include <format>
+#include <ranges>
+
+#include "iceberg/constants.h"
+#include "iceberg/file_io.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/manifest/rolling_manifest_writer.h"
+#include "iceberg/partition_summary_internal.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/string_util.h"
+#include "iceberg/util/uuid.h"
+
+namespace iceberg {
+
+namespace {
+
+// The Java impl skips updating total if parsing fails. Here we choose to be 
strict.
+Status UpdateTotal(std::unordered_map<std::string, std::string>& summary,
+                   const std::unordered_map<std::string, std::string>& 
previous_summary,
+                   const std::string& total_property, const std::string& 
added_property,
+                   const std::string& deleted_property) {
+  auto total_it = previous_summary.find(total_property);
+  if (total_it != previous_summary.end()) {
+    ICEBERG_ASSIGN_OR_RAISE(auto new_total,
+                            StringUtils::ParseInt<int64_t>(total_it->second));
+
+    auto added_it = summary.find(added_property);
+    if (new_total >= 0 && added_it != summary.end()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto added_value,
+                              
StringUtils::ParseInt<int64_t>(added_it->second));
+      new_total += added_value;
+    }
+
+    auto deleted_it = summary.find(deleted_property);
+    if (new_total >= 0 && deleted_it != summary.end()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto deleted_value,
+                              
StringUtils::ParseInt<int64_t>(deleted_it->second));
+      new_total -= deleted_value;
+    }
+
+    if (new_total >= 0) {
+      summary[total_property] = std::to_string(new_total);
+    }
+  }
+  return {};
+}
+
+// Add metadata to a manifest file by reading it and extracting statistics.
+Result<ManifestFile> AddMetadata(const ManifestFile& manifest, 
std::shared_ptr<FileIO> io,
+                                 const TableMetadata& metadata) {
+  ICEBERG_PRECHECK(manifest.added_snapshot_id != kInvalidSnapshotId,
+                   "Manifest {} already has assigned a snapshot id: {}",
+                   manifest.manifest_path, manifest.added_snapshot_id);
+
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, metadata.Schema());
+  ICEBERG_ASSIGN_OR_RAISE(auto spec,
+                          
metadata.PartitionSpecById(manifest.partition_spec_id));
+  ICEBERG_ASSIGN_OR_RAISE(auto partition_type, spec->PartitionType(*schema));
+
+  ICEBERG_ASSIGN_OR_RAISE(auto reader,
+                          ManifestReader::Make(manifest, std::move(io), 
schema, spec));
+  ICEBERG_ASSIGN_OR_RAISE(auto entries, reader->Entries());
+
+  PartitionSummary stats(*partition_type);
+  int32_t added_files = 0;
+  int64_t added_rows = 0;
+  int32_t existing_files = 0;
+  int64_t existing_rows = 0;
+  int32_t deleted_files = 0;
+  int64_t deleted_rows = 0;
+
+  std::optional<int64_t> snapshot_id;
+  int64_t max_snapshot_id = std::numeric_limits<int64_t>::min();
+  for (const auto& entry : entries) {
+    ICEBERG_PRECHECK(entry.data_file != nullptr,
+                     "Manifest entry in {} is missing data_file", 
manifest.manifest_path);
+
+    if (entry.snapshot_id.has_value() && entry.snapshot_id.value() > 
max_snapshot_id) {
+      max_snapshot_id = entry.snapshot_id.value();
+    }
+
+    switch (entry.status) {
+      case ManifestStatus::kAdded: {
+        added_files += 1;
+        added_rows += entry.data_file->record_count;
+        if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) {
+          snapshot_id = entry.snapshot_id;
+        }
+      } break;
+      case ManifestStatus::kExisting: {
+        existing_files += 1;
+        existing_rows += entry.data_file->record_count;
+      } break;
+      case ManifestStatus::kDeleted: {
+        deleted_files += 1;
+        deleted_rows += entry.data_file->record_count;
+        if (!snapshot_id.has_value() && entry.snapshot_id.has_value()) {
+          snapshot_id = entry.snapshot_id;
+        }
+      } break;
+    }
+
+    ICEBERG_RETURN_UNEXPECTED(stats.Update(entry.data_file->partition));
+  }
+
+  if (!snapshot_id.has_value()) {
+    // If no files were added or deleted, use the largest snapshot ID in the 
manifest
+    snapshot_id = max_snapshot_id;
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto partition_summaries, stats.Summaries());
+
+  ManifestFile enriched = manifest;
+  enriched.added_snapshot_id = snapshot_id.value();
+  enriched.added_files_count = added_files;
+  enriched.existing_files_count = existing_files;
+  enriched.deleted_files_count = deleted_files;
+  enriched.added_rows_count = added_rows;
+  enriched.existing_rows_count = existing_rows;
+  enriched.deleted_rows_count = deleted_rows;
+  enriched.partitions = std::move(partition_summaries);
+  enriched.first_row_id = std::nullopt;
+  return enriched;
+}
+
+}  // anonymous namespace
+
+SnapshotUpdate::~SnapshotUpdate() = default;
+
+SnapshotUpdate::SnapshotUpdate(std::shared_ptr<Transaction> transaction)
+    : PendingUpdate(std::move(transaction)),
+      can_inherit_snapshot_id_(
+          base().format_version > 1 ||
+          
base().properties.Get(TableProperties::kSnapshotIdInheritanceEnabled)),
+      commit_uuid_(Uuid::GenerateV7().ToString()),
+      target_manifest_size_bytes_(
+          base().properties.Get(TableProperties::kManifestTargetSizeBytes)) {}
+
+// TODO(xxx): write manifests in parallel
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDataManifests(
+    const std::vector<std::shared_ptr<DataFile>>& data_files,
+    const std::shared_ptr<PartitionSpec>& spec,
+    std::optional<int64_t> data_sequence_number) {
+  if (data_files.empty()) {
+    return std::vector<ManifestFile>{};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
+  RollingManifestWriter rolling_writer(
+      [this, spec, schema = std::move(current_schema),
+       snapshot_id = SnapshotId()]() -> 
Result<std::unique_ptr<ManifestWriter>> {
+        return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
+                                          ManifestPath(), 
transaction_->table()->io(),
+                                          std::move(spec), std::move(schema),
+                                          ManifestContent::kData,
+                                          /*first_row_id=*/base().next_row_id);
+      },
+      target_manifest_size_bytes_);
+
+  for (const auto& file : data_files) {
+    ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file, 
data_sequence_number));
+  }
+  ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+  return rolling_writer.ToManifestFiles();
+}
+
+// TODO(xxx): write manifests in parallel
+Result<std::vector<ManifestFile>> SnapshotUpdate::WriteDeleteManifests(
+    const std::vector<std::shared_ptr<DataFile>>& delete_files,
+    const std::shared_ptr<PartitionSpec>& spec) {
+  if (delete_files.empty()) {
+    return std::vector<ManifestFile>{};
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto current_schema, base().Schema());
+  RollingManifestWriter rolling_writer(
+      [this, spec, schema = std::move(current_schema),
+       snapshot_id = SnapshotId()]() -> 
Result<std::unique_ptr<ManifestWriter>> {
+        return ManifestWriter::MakeWriter(base().format_version, snapshot_id,
+                                          ManifestPath(), 
transaction_->table()->io(),
+                                          std::move(spec), std::move(schema),
+                                          ManifestContent::kDeletes);
+      },
+      target_manifest_size_bytes_);
+
+  for (const auto& file : delete_files) {
+    /// FIXME: Java impl wrap it with `PendingDeleteFile` and deals with
+    /// file->data_sequenece_number
+    ICEBERG_RETURN_UNEXPECTED(rolling_writer.WriteAddedEntry(file));
+  }
+  ICEBERG_RETURN_UNEXPECTED(rolling_writer.Close());
+  return rolling_writer.ToManifestFiles();
+}
+
+int64_t SnapshotUpdate::SnapshotId() {
+  if (!snapshot_id_.has_value()) {
+    snapshot_id_ = SnapshotUtil::GenerateSnapshotId(base());
+  }
+  return snapshot_id_.value();
+}
+
+Result<SnapshotUpdate::ApplyResult> SnapshotUpdate::Apply() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+  ICEBERG_ASSIGN_OR_RAISE(auto parent_snapshot,
+                          SnapshotUtil::OptionalLatestSnapshot(base(), 
target_branch_));
+
+  int64_t sequence_number = base().NextSequenceNumber();
+  std::optional<int64_t> parent_snapshot_id =
+      parent_snapshot ? std::make_optional(parent_snapshot->snapshot_id) : 
std::nullopt;
+
+  if (parent_snapshot) {
+    ICEBERG_RETURN_UNEXPECTED(Validate(base(), parent_snapshot));
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto manifests, Apply(base(), parent_snapshot));
+  for (auto& manifest : manifests) {
+    if (manifest.added_snapshot_id != kInvalidSnapshotId) {
+      continue;
+    }
+    // TODO(xxx): read in parallel and cache enriched manifests for retries
+    ICEBERG_ASSIGN_OR_RAISE(manifest,
+                            AddMetadata(manifest, transaction_->table()->io(), 
base()));
+  }
+
+  std::string manifest_list_path = ManifestListPath();
+  manifest_lists_.push_back(manifest_list_path);
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto writer, ManifestListWriter::MakeWriter(base().format_version, 
SnapshotId(),
+                                                  parent_snapshot_id, 
manifest_list_path,
+                                                  transaction_->table()->io(),
+                                                  sequence_number, 
base().next_row_id));
+  ICEBERG_RETURN_UNEXPECTED(writer->AddAll(manifests));
+  ICEBERG_RETURN_UNEXPECTED(writer->Close());
+
+  std::optional<int64_t> next_row_id;
+  std::optional<int64_t> assigned_rows;
+  if (base().format_version >= 3) {
+    ICEBERG_CHECK(writer->next_row_id().has_value(),
+                  "row id is required by format version >= 3");
+    next_row_id = base().next_row_id;
+    assigned_rows = writer->next_row_id().value() - base().next_row_id;
+  }
+
+  std::string op = operation();
+  ICEBERG_CHECK(!op.empty(), "Snapshot operation cannot be empty");
+
+  if (op == DataOperation::kReplace) {
+    const auto summary = Summary();
+    auto added_records_it = summary.find(SnapshotSummaryFields::kAddedRecords);
+    auto replaced_records_it = 
summary.find(SnapshotSummaryFields::kDeletedRecords);
+    if (added_records_it != summary.cend() && replaced_records_it != 
summary.cend()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto added_records,
+                              
StringUtils::ParseInt<int64_t>(added_records_it->second));
+      ICEBERG_ASSIGN_OR_RAISE(auto replaced_records, 
StringUtils::ParseInt<int64_t>(
+                                                         
replaced_records_it->second));
+      ICEBERG_PRECHECK(
+          added_records <= replaced_records,
+          "Invalid REPLACE operation: {} added records > {} replaced records",
+          added_records, replaced_records);
+    }
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto summary, ComputeSummary(base()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      staged_snapshot_,
+      Snapshot::Make(sequence_number, SnapshotId(), parent_snapshot_id,
+                     CurrentTimePointMs(), std::move(op), std::move(summary),
+                     base().current_schema_id, std::move(manifest_list_path), 
next_row_id,
+                     assigned_rows));
+
+  return ApplyResult{.snapshot = staged_snapshot_,
+                     .target_branch = target_branch_,
+                     .stage_only = stage_only_};
+}
+
+Status SnapshotUpdate::Finalize(std::optional<Error> commit_error) {
+  if (commit_error.has_value()) {
+    if (commit_error->kind == ErrorKind::kCommitStateUnknown) {
+      return {};
+    }
+    CleanAll();
+    return {};
+  }
+
+  if (CleanupAfterCommit()) {
+    ICEBERG_CHECK(staged_snapshot_ != nullptr,
+                  "Staged snapshot is null during finalize after commit");
+    auto cached_snapshot = SnapshotCache(staged_snapshot_.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests,
+                            
cached_snapshot.Manifests(transaction_->table()->io()));
+    CleanUncommitted(manifests | std::views::transform([](const auto& 
manifest) {
+                       return manifest.manifest_path;
+                     }) |
+                     std::ranges::to<std::unordered_set<std::string>>());
+  }
+
+  // Also clean up unused manifest lists created by multiple attempts
+  for (const auto& manifest_list : manifest_lists_) {
+    if (manifest_list != staged_snapshot_->manifest_list) {
+      std::ignore = DeleteFile(manifest_list);
+    }
+  }
+
+  return {};
+}
+
+Status SnapshotUpdate::SetTargetBranch(const std::string& branch) {
+  ICEBERG_PRECHECK(!branch.empty(), "Branch name cannot be empty");
+
+  if (auto ref_it = base().refs.find(branch); ref_it != base().refs.end()) {
+    ICEBERG_PRECHECK(
+        ref_it->second->type() == SnapshotRefType::kBranch,
+        "{} is a tag, not a branch. Tags cannot be targets for producing 
snapshots",
+        branch);
+  }
+
+  target_branch_ = branch;
+  return {};
+}
+
+Result<std::unordered_map<std::string, std::string>> 
SnapshotUpdate::ComputeSummary(
+    const TableMetadata& previous) {
+  std::unordered_map<std::string, std::string> summary = Summary();
+  if (summary.empty()) {
+    return summary;
+  }
+
+  // Get previous summary from the target branch
+  std::unordered_map<std::string, std::string> previous_summary;
+  if (auto ref_it = previous.refs.find(target_branch_); ref_it != 
previous.refs.end()) {
+    if (auto snap_it = previous.SnapshotById(ref_it->second->snapshot_id);
+        snap_it.has_value()) {
+      previous_summary = snap_it.value()->summary;
+    }
+  } else {
+    // if there was no previous snapshot, default the summary to start totals 
at 0
+    previous_summary[SnapshotSummaryFields::kTotalRecords] = "0";
+    previous_summary[SnapshotSummaryFields::kTotalFileSize] = "0";
+    previous_summary[SnapshotSummaryFields::kTotalDataFiles] = "0";
+    previous_summary[SnapshotSummaryFields::kTotalDeleteFiles] = "0";
+    previous_summary[SnapshotSummaryFields::kTotalPosDeletes] = "0";
+    previous_summary[SnapshotSummaryFields::kTotalEqDeletes] = "0";
+  }
+
+  // Update totals
+  ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+      summary, previous_summary, SnapshotSummaryFields::kTotalRecords,
+      SnapshotSummaryFields::kAddedRecords, 
SnapshotSummaryFields::kDeletedRecords));
+  ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+      summary, previous_summary, SnapshotSummaryFields::kTotalFileSize,
+      SnapshotSummaryFields::kAddedFileSize, 
SnapshotSummaryFields::kRemovedFileSize));
+  ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+      summary, previous_summary, SnapshotSummaryFields::kTotalDataFiles,
+      SnapshotSummaryFields::kAddedDataFiles, 
SnapshotSummaryFields::kDeletedDataFiles));
+  ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary,
+                                        
SnapshotSummaryFields::kTotalDeleteFiles,
+                                        
SnapshotSummaryFields::kAddedDeleteFiles,
+                                        
SnapshotSummaryFields::kRemovedDeleteFiles));
+  ICEBERG_RETURN_UNEXPECTED(UpdateTotal(summary, previous_summary,
+                                        
SnapshotSummaryFields::kTotalPosDeletes,
+                                        
SnapshotSummaryFields::kAddedPosDeletes,
+                                        
SnapshotSummaryFields::kRemovedPosDeletes));
+  ICEBERG_RETURN_UNEXPECTED(UpdateTotal(
+      summary, previous_summary, SnapshotSummaryFields::kTotalEqDeletes,
+      SnapshotSummaryFields::kAddedEqDeletes, 
SnapshotSummaryFields::kRemovedEqDeletes));
+
+  // TODO(xxx): add custom summary fields like engine info
+  return summary;
+}
+
+void SnapshotUpdate::CleanAll() {
+  for (const auto& manifest_list : manifest_lists_) {
+    std::ignore = DeleteFile(manifest_list);
+  }
+  manifest_lists_.clear();
+  CleanUncommitted(std::unordered_set<std::string>{});
+}
+
+Status SnapshotUpdate::DeleteFile(const std::string& path) {
+  static const auto kDefaultDeleteFunc = [this](const std::string& path) {
+    return this->transaction_->table()->io()->DeleteFile(path);
+  };
+  if (delete_func_) {
+    return delete_func_(path);
+  } else {
+    return kDefaultDeleteFunc(path);
+  }
+}
+
+std::string SnapshotUpdate::ManifestListPath() {
+  // Generate manifest list path
+  // Format: {metadata_location}/snap-{snapshot_id}-{attempt}-{uuid}.avro
+  int64_t snapshot_id = SnapshotId();
+  std::string filename =
+      std::format("snap-{}-{}-{}.avro", snapshot_id, ++attempt_, commit_uuid_);
+  return transaction_->MetadataFileLocation(filename);
+}
+
+std::string SnapshotUpdate::ManifestPath() {
+  // Generate manifest path
+  // Format: {metadata_location}/{uuid}-m{manifest_count}.avro
+  std::string filename = std::format("{}-m{}.avro", commit_uuid_, 
manifest_count_++);
+  return transaction_->MetadataFileLocation(filename);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/snapshot_update.h 
b/src/iceberg/update/snapshot_update.h
new file mode 100644
index 00000000..48ef1676
--- /dev/null
+++ b/src/iceberg/update/snapshot_update.h
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <memory>
+#include <optional>
+#include <string>
+#include <unordered_map>
+#include <unordered_set>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/result.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+namespace iceberg {
+
+/// \brief Base class for operations that produce snapshots.
+///
+/// This class provides common functionality for creating new snapshots,
+/// including manifest list writing and cleanup.
+class ICEBERG_EXPORT SnapshotUpdate : public PendingUpdate {
+ public:
+  /// \brief Result of applying a snapshot update
+  struct ApplyResult {
+    std::shared_ptr<Snapshot> snapshot;
+    std::string target_branch;
+    bool stage_only = false;
+  };
+
+  ~SnapshotUpdate() override;
+
+  /// \brief Set a callback to delete files instead of the table's default.
+  ///
+  /// \param delete_func A function used to delete file locations
+  /// \return Reference to this for method chaining
+  /// \note Cannot be called more than once
+  auto& DeleteWith(this auto& self,
+                   std::function<Status(const std::string&)> delete_func) {
+    if (self.delete_func_) {
+      return self.AddError(ErrorKind::kInvalidArgument,
+                           "Cannot set delete callback more than once");
+    }
+    self.delete_func_ = std::move(delete_func);
+    return self;
+  }
+
+  /// \brief Stage a snapshot in table metadata, but not update the current 
snapshot id.
+  ///
+  /// \return Reference to this for method chaining
+  auto& StageOnly(this auto& self) {
+    self.stage_only_ = true;
+    return self;
+  }
+
+  /// \brief Apply the update's changes to create a new snapshot.
+  ///
+  /// This method validates the changes, applies them to the metadata,
+  /// and creates a new snapshot without committing it. The snapshot
+  /// is stored internally and can be accessed after Apply() succeeds.
+  ///
+  /// \return A result containing the new snapshot, or an error
+  Result<ApplyResult> Apply();
+
+  /// \brief Finalize the snapshot update, cleaning up any uncommitted files.
+  Status Finalize(std::optional<Error> commit_error) override;
+
+ protected:
+  explicit SnapshotUpdate(std::shared_ptr<Transaction> transaction);
+
+  /// \brief Write data manifests for the given data files
+  ///
+  /// \param data_files The data files to write
+  /// \param spec The partition spec to use
+  /// \param data_sequence_number Optional data sequence number for the files
+  /// \return A vector of manifest files
+  Result<std::vector<ManifestFile>> WriteDataManifests(
+      const std::vector<std::shared_ptr<DataFile>>& data_files,
+      const std::shared_ptr<PartitionSpec>& spec,
+      std::optional<int64_t> data_sequence_number = std::nullopt);
+
+  /// \brief Write delete manifests for the given delete files
+  ///
+  /// \param delete_files The delete files to write
+  /// \param spec The partition spec to use
+  /// \return A vector of manifest files
+  Result<std::vector<ManifestFile>> WriteDeleteManifests(
+      const std::vector<std::shared_ptr<DataFile>>& delete_files,
+      const std::shared_ptr<PartitionSpec>& spec);
+
+  Status SetTargetBranch(const std::string& branch);
+  const std::string& target_branch() const { return target_branch_; }
+  bool can_inherit_snapshot_id() const { return can_inherit_snapshot_id_; }
+  const std::string& commit_uuid() const { return commit_uuid_; }
+  int32_t manifest_count() const { return manifest_count_; }
+  int32_t attempt() const { return attempt_; }
+  int64_t target_manifest_size_bytes() const { return 
target_manifest_size_bytes_; }
+
+  /// \brief Clean up any uncommitted manifests that were created.
+  ///
+  /// Manifests may not be committed if Apply is called multiple times
+  /// because a commit conflict has occurred. Implementations may keep
+  /// around manifests because the same changes will be made by both
+  /// Apply calls. This method instructs the implementation to clean up
+  /// those manifests and passes the paths of the manifests that were
+  /// actually committed.
+  ///
+  /// \param committed A set of manifest paths that were actually committed
+  virtual void CleanUncommitted(const std::unordered_set<std::string>& 
committed) = 0;
+
+  /// \brief A string that describes the action that produced the new snapshot.
+  ///
+  /// \return A string operation name
+  virtual std::string operation() = 0;
+
+  /// \brief Validate the current metadata.
+  ///
+  /// Child operations can override this to add custom validation.
+  ///
+  /// \param current_metadata Current table metadata to validate
+  /// \param snapshot Ending snapshot on the lineage which is being validated
+  virtual Status Validate(const TableMetadata& current_metadata,
+                          const std::shared_ptr<Snapshot>& snapshot) {
+    return {};
+  };
+
+  /// \brief Apply the update's changes to the given metadata and snapshot.
+  ///
+  /// \param metadata_to_update The base table metadata to apply changes to
+  /// \param snapshot Snapshot to apply the changes to
+  /// \return A vector of manifest files for the new snapshot
+  virtual Result<std::vector<ManifestFile>> Apply(
+      const TableMetadata& metadata_to_update,
+      const std::shared_ptr<Snapshot>& snapshot) = 0;
+
+  /// \brief Get the summary map for this operation.
+  ///
+  /// \return A map of summary properties
+  virtual std::unordered_map<std::string, std::string> Summary() = 0;
+
+  /// \brief Check if cleanup should happen after commit
+  ///
+  /// \return True if cleanup should happen after commit
+  virtual bool CleanupAfterCommit() const { return true; }
+
+  /// \brief Get or generate the snapshot ID for the new snapshot.
+  int64_t SnapshotId();
+
+ private:
+  /// \brief Returns the snapshot summary from the implementation and updates 
totals.
+  Result<std::unordered_map<std::string, std::string>> ComputeSummary(
+      const TableMetadata& previous);
+
+  /// \brief Clean up all uncommitted files
+  void CleanAll();
+
+  Status DeleteFile(const std::string& path);
+  std::string ManifestListPath();
+  std::string ManifestPath();
+
+ private:
+  const bool can_inherit_snapshot_id_{true};
+  const std::string commit_uuid_;
+  int32_t manifest_count_{0};
+  int32_t attempt_{0};
+  std::vector<std::string> manifest_lists_;
+  const int64_t target_manifest_size_bytes_;
+  std::optional<int64_t> snapshot_id_;
+  bool stage_only_{false};
+  std::function<Status(const std::string&)> delete_func_;
+  std::string target_branch_{SnapshotRef::kMainBranch};
+  std::shared_ptr<Snapshot> staged_snapshot_;
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/update_partition_spec.cc 
b/src/iceberg/update/update_partition_spec.cc
index ffea1e09..54c3dc60 100644
--- a/src/iceberg/update/update_partition_spec.cc
+++ b/src/iceberg/update/update_partition_spec.cc
@@ -47,11 +47,10 @@ Result<std::shared_ptr<UpdatePartitionSpec>> 
UpdatePartitionSpec::Make(
 
 UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction> 
transaction)
     : PendingUpdate(std::move(transaction)) {
-  const TableMetadata& base_metadata = transaction_->current();
-  format_version_ = base_metadata.format_version;
+  format_version_ = base().format_version;
 
   // Get the current/default partition spec
-  auto spec_result = base_metadata.PartitionSpec();
+  auto spec_result = base().PartitionSpec();
   if (!spec_result.has_value()) {
     AddError(spec_result.error());
     return;
@@ -59,15 +58,15 @@ 
UpdatePartitionSpec::UpdatePartitionSpec(std::shared_ptr<Transaction> transactio
   spec_ = std::move(spec_result.value());
 
   // Get the current schema
-  auto schema_result = base_metadata.Schema();
+  auto schema_result = base().Schema();
   if (!schema_result.has_value()) {
     AddError(schema_result.error());
     return;
   }
   schema_ = std::move(schema_result.value());
 
-  last_assigned_partition_id_ = std::max(base_metadata.last_partition_id,
-                                         
PartitionSpec::kLegacyPartitionDataIdStart - 1);
+  last_assigned_partition_id_ =
+      std::max(base().last_partition_id, 
PartitionSpec::kLegacyPartitionDataIdStart - 1);
   name_to_field_ = IndexSpecByName(*spec_);
   transform_to_field_ = IndexSpecByTransform(*spec_);
 
@@ -433,18 +432,16 @@ UpdatePartitionSpec::IndexSpecByTransform(const 
PartitionSpec& spec) {
 }
 
 void UpdatePartitionSpec::BuildHistoricalFieldsIndex() {
-  const TableMetadata& base_metadata = transaction_->current();
-
   // Count total fields across all specs to reserve capacity
   size_t total_fields = 0;
-  for (const auto& partition_spec : base_metadata.partition_specs) {
+  for (const auto& partition_spec : base().partition_specs) {
     total_fields += partition_spec->fields().size();
   }
   historical_fields_.reserve(total_fields);
 
   // Index all fields from all historical partition specs
   // Later specs override earlier ones for the same (source_id, transform) key
-  for (const auto& partition_spec : base_metadata.partition_specs) {
+  for (const auto& partition_spec : base().partition_specs) {
     for (const auto& field : partition_spec->fields()) {
       TransformKey key{field.source_id(), field.transform()->ToString()};
       historical_fields_.emplace(key, field);
diff --git a/src/iceberg/update/update_properties.cc 
b/src/iceberg/update/update_properties.cc
index ce809c43..fe49df81 100644
--- a/src/iceberg/update/update_properties.cc
+++ b/src/iceberg/update/update_properties.cc
@@ -19,10 +19,8 @@
 
 #include "iceberg/update/update_properties.h"
 
-#include <charconv>
 #include <cstdint>
 #include <memory>
-#include <system_error>
 
 #include "iceberg/metrics_config.h"
 #include "iceberg/result.h"
@@ -31,6 +29,7 @@
 #include "iceberg/transaction.h"
 #include "iceberg/util/error_collector.h"
 #include "iceberg/util/macros.h"
+#include "iceberg/util/string_util.h"
 
 namespace iceberg {
 
@@ -70,7 +69,7 @@ UpdateProperties& UpdateProperties::Remove(const std::string& 
key) {
 
 Result<UpdateProperties::ApplyResult> UpdateProperties::Apply() {
   ICEBERG_RETURN_UNEXPECTED(CheckErrors());
-  const auto& current_props = transaction_->current().properties.configs();
+  const auto& current_props = base().properties.configs();
   std::unordered_map<std::string, std::string> new_properties;
   std::vector<std::string> removals;
   for (const auto& [key, value] : current_props) {
@@ -85,15 +84,8 @@ Result<UpdateProperties::ApplyResult> 
UpdateProperties::Apply() {
 
   auto iter = new_properties.find(TableProperties::kFormatVersion.key());
   if (iter != new_properties.end()) {
-    int parsed_version = 0;
-    const auto& val = iter->second;
-    auto [ptr, ec] = std::from_chars(val.data(), val.data() + val.size(), 
parsed_version);
-
-    if (ec == std::errc::invalid_argument) {
-      return InvalidArgument("Invalid format version '{}': not a valid 
integer", val);
-    } else if (ec == std::errc::result_out_of_range) {
-      return InvalidArgument("Format version '{}' is out of range", val);
-    }
+    ICEBERG_ASSIGN_OR_RAISE(auto parsed_version,
+                            StringUtils::ParseInt<int32_t>(iter->second));
 
     if (parsed_version > TableMetadata::kSupportedTableFormatVersion) {
       return InvalidArgument(
@@ -105,7 +97,7 @@ Result<UpdateProperties::ApplyResult> 
UpdateProperties::Apply() {
     updates_.erase(TableProperties::kFormatVersion.key());
   }
 
-  if (auto schema = transaction_->current().Schema(); schema.has_value()) {
+  if (auto schema = base().Schema(); schema.has_value()) {
     ICEBERG_RETURN_UNEXPECTED(
         MetricsConfig::VerifyReferencedColumns(new_properties, 
*schema.value()));
   }
diff --git a/src/iceberg/update/update_schema.cc 
b/src/iceberg/update/update_schema.cc
index 0e81c4ad..327ee0ac 100644
--- a/src/iceberg/update/update_schema.cc
+++ b/src/iceberg/update/update_schema.cc
@@ -250,10 +250,8 @@ Result<std::shared_ptr<UpdateSchema>> UpdateSchema::Make(
 
 UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> transaction)
     : PendingUpdate(std::move(transaction)) {
-  const TableMetadata& base_metadata = transaction_->current();
-
   // Get the current schema
-  auto schema_result = base_metadata.Schema();
+  auto schema_result = base().Schema();
   if (!schema_result.has_value()) {
     AddError(schema_result.error());
     return;
@@ -261,7 +259,7 @@ UpdateSchema::UpdateSchema(std::shared_ptr<Transaction> 
transaction)
   schema_ = std::move(schema_result.value());
 
   // Initialize last_column_id from base metadata
-  last_column_id_ = base_metadata.last_column_id;
+  last_column_id_ = base().last_column_id;
 
   // Initialize identifier field names from the current schema
   auto identifier_names_result = schema_->IdentifierFieldNames();
diff --git a/src/iceberg/update/update_sort_order.cc 
b/src/iceberg/update/update_sort_order.cc
index e3e651d5..c5c7be32 100644
--- a/src/iceberg/update/update_sort_order.cc
+++ b/src/iceberg/update/update_sort_order.cc
@@ -19,7 +19,6 @@
 
 #include "iceberg/update/update_sort_order.h"
 
-#include <cstdint>
 #include <memory>
 #include <vector>
 
@@ -52,7 +51,7 @@ UpdateSortOrder& UpdateSortOrder::AddSortField(const 
std::shared_ptr<Term>& term
   ICEBERG_BUILDER_CHECK(term != nullptr, "Term cannot be null");
   ICEBERG_BUILDER_CHECK(term->is_unbound(), "Term must be unbound");
 
-  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, 
transaction_->current().Schema());
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema, base().Schema());
   if (term->kind() == Term::Kind::kReference) {
     // kReference is treated as identity transform
     auto named_ref = internal::checked_pointer_cast<NamedReference>(term);
@@ -99,7 +98,7 @@ Result<std::shared_ptr<SortOrder>> UpdateSortOrder::Apply() {
     // The actual sort order ID will be assigned by TableMetadataBuilder when
     // the AddSortOrder update is applied.
     ICEBERG_ASSIGN_OR_RAISE(order, SortOrder::Make(/*sort_id=*/-1, 
sort_fields_));
-    ICEBERG_ASSIGN_OR_RAISE(auto schema, transaction_->current().Schema());
+    ICEBERG_ASSIGN_OR_RAISE(auto schema, base().Schema());
     ICEBERG_RETURN_UNEXPECTED(order->Validate(*schema));
   }
   return order;
diff --git a/src/iceberg/util/snapshot_util.cc 
b/src/iceberg/util/snapshot_util.cc
index 2ec36478..4395dc27 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -26,6 +26,7 @@
 #include "iceberg/util/macros.h"
 #include "iceberg/util/snapshot_util_internal.h"
 #include "iceberg/util/timepoint.h"
+#include "iceberg/util/uuid.h"
 
 namespace iceberg {
 
@@ -335,4 +336,28 @@ Result<std::shared_ptr<Snapshot>> 
SnapshotUtil::LatestSnapshot(
   return metadata.SnapshotById(it->second->snapshot_id);
 }
 
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::OptionalLatestSnapshot(
+    const TableMetadata& metadata, const std::string& branch) {
+  return LatestSnapshot(metadata, branch)
+      .or_else([](const auto& error) -> Result<std::shared_ptr<Snapshot>> {
+        if (error.kind == ErrorKind::kNotFound) {
+          return nullptr;
+        }
+        return std::unexpected<Error>(error);
+      });
+}
+
+int64_t SnapshotUtil::GenerateSnapshotId() {
+  auto uuid = Uuid::GenerateV7();
+  return (uuid.high_bits() ^ uuid.low_bits()) & 
std::numeric_limits<int64_t>::max();
+}
+
+int64_t SnapshotUtil::GenerateSnapshotId(const TableMetadata& metadata) {
+  auto snapshot_id = GenerateSnapshotId();
+  while (metadata.SnapshotById(snapshot_id).has_value()) {
+    snapshot_id = GenerateSnapshotId();
+  }
+  return snapshot_id;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util_internal.h 
b/src/iceberg/util/snapshot_util_internal.h
index 2b11168e..befa96fe 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -20,6 +20,7 @@
 #pragma once
 
 #include <functional>
+#include <memory>
 #include <optional>
 #include <string>
 #include <vector>
@@ -229,13 +230,13 @@ class ICEBERG_EXPORT SnapshotUtil {
 
   /// \brief Fetch the snapshot at the head of the given branch in the given 
table.
   ///
-  /// This method calls Table::current_snapshot() instead of using branch API 
for the main
+  /// This method calls TableMetadata::Snapshot() instead of using branch API 
for the main
   /// branch so that existing code still goes through the old code path to 
ensure
   /// backwards compatibility.
   ///
   /// \param table The table
   /// \param branch Branch name of the table (empty string means main branch)
-  /// \return The latest snapshot for the given branch
+  /// \return The latest snapshot for the given branch, or NotFoundError.
   static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const Table& table,
                                                           const std::string& 
branch);
 
@@ -251,10 +252,31 @@ class ICEBERG_EXPORT SnapshotUtil {
   /// \param metadata The table metadata
   /// \param branch Branch name of the table metadata (empty string means main
   /// branch)
-  /// \return The latest snapshot for the given branch
+  /// \return The latest snapshot for the given branch, or NotFoundError.
   static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const TableMetadata& 
metadata,
                                                           const std::string& 
branch);
 
+  /// \brief Fetch the snapshot at the head of the given branch in the given 
table.
+  ///
+  /// Like LatestSnapshot above except that nullptr is returned if snapshot 
does not
+  /// exist.
+  ///
+  /// \param metadata The table metadata
+  /// \param branch Branch name of the table metadata (empty string means main
+  /// branch)
+  /// \return The latest snapshot for the given branch, or nullptr if not 
found.
+  static Result<std::shared_ptr<Snapshot>> OptionalLatestSnapshot(
+      const TableMetadata& metadata, const std::string& branch);
+
+  /// \brief Generate a new snapshot ID.
+  static int64_t GenerateSnapshotId();
+
+  /// \brief Generate a new snapshot ID for the given metadata.
+  ///
+  /// \param metadata The table metadata
+  /// \return A new snapshot ID
+  static int64_t GenerateSnapshotId(const TableMetadata& metadata);
+
  private:
   /// \brief Helper function to traverse ancestors of a snapshot.
   ///
diff --git a/src/iceberg/util/string_util.h b/src/iceberg/util/string_util.h
index 0c9e89bc..dfedb4a7 100644
--- a/src/iceberg/util/string_util.h
+++ b/src/iceberg/util/string_util.h
@@ -20,10 +20,14 @@
 #pragma once
 
 #include <algorithm>
+#include <charconv>
 #include <ranges>
 #include <string>
+#include <string_view>
+#include <typeinfo>
 
 #include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
 
 namespace iceberg {
 
@@ -61,6 +65,20 @@ class ICEBERG_EXPORT StringUtils {
     }
     return count;
   }
+
+  template <typename T>
+  static Result<T> ParseInt(std::string_view str) {
+    T value = 0;
+    auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(), 
value);
+    if (ec == std::errc::invalid_argument) [[unlikely]] {
+      return InvalidArgument("Failed to parse integer from string '{}': 
invalid argument",
+                             str);
+    } else if (ec == std::errc::result_out_of_range) [[unlikely]] {
+      return InvalidArgument("Failed to parse {} from string '{}': value out 
of range",
+                             typeid(T).name(), str);
+    }
+    return value;
+  }
 };
 
 /// \brief Transparent hash function that supports std::string_view as lookup 
key
diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc
index 0381e90a..ed52dddf 100644
--- a/src/iceberg/util/timepoint.cc
+++ b/src/iceberg/util/timepoint.cc
@@ -60,4 +60,11 @@ std::string FormatTimePointMs(TimePointMs time_point_ms) {
   return oss.str();
 }
 
+TimePointMs CurrentTimePointMs() {
+  auto now = std::chrono::system_clock::now();
+  auto duration_since_epoch = now.time_since_epoch();
+  return TimePointMs{
+      
std::chrono::duration_cast<std::chrono::milliseconds>(duration_since_epoch)};
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h
index 6052c94a..ed303e1f 100644
--- a/src/iceberg/util/timepoint.h
+++ b/src/iceberg/util/timepoint.h
@@ -49,4 +49,7 @@ ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs 
time_point_ns);
 /// \brief Returns a human-readable string representation of a TimePointMs
 ICEBERG_EXPORT std::string FormatTimePointMs(TimePointMs time_point_ms);
 
+/// \brief Returns a time point in milliseconds that represents the current 
system time
+ICEBERG_EXPORT TimePointMs CurrentTimePointMs();
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc
index 9322deb9..cc76095a 100644
--- a/src/iceberg/util/uuid.cc
+++ b/src/iceberg/util/uuid.cc
@@ -217,4 +217,16 @@ std::string Uuid::ToString() const {
       data_[15]);
 }
 
+int64_t Uuid::high_bits() const {
+  int64_t result;
+  std::memcpy(&result, data_.data(), 8);
+  return result;
+}
+
+int64_t Uuid::low_bits() const {
+  int64_t result;
+  std::memcpy(&result, data_.data() + 8, 8);
+  return result;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/util/uuid.h b/src/iceberg/util/uuid.h
index 64db7c5d..69ac10fc 100644
--- a/src/iceberg/util/uuid.h
+++ b/src/iceberg/util/uuid.h
@@ -78,6 +78,9 @@ class ICEBERG_EXPORT Uuid : public util::Formattable {
     return lhs.data_ == rhs.data_;
   }
 
+  int64_t high_bits() const;
+  int64_t low_bits() const;
+
  private:
   std::array<uint8_t, kLength> data_;
 };

Reply via email to