This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new f9f14493 feat: add update table stats (#511)
f9f14493 is described below

commit f9f1449306a2857bf8fbf0f8c1d1c3d6bc1b1700
Author: Feiyang Li <[email protected]>
AuthorDate: Wed Jan 21 17:57:55 2026 +0800

    feat: add update table stats (#511)
---
 src/iceberg/CMakeLists.txt                 |   1 +
 src/iceberg/json_internal.cc               |  29 ++++
 src/iceberg/meson.build                    |   1 +
 src/iceberg/table.cc                       |   8 ++
 src/iceberg/table.h                        |   4 +
 src/iceberg/table_metadata.cc              |  43 +++++-
 src/iceberg/table_metadata.h               |   3 +-
 src/iceberg/table_update.cc                |  80 +++++++++--
 src/iceberg/table_update.h                 |  49 +++++++
 src/iceberg/test/CMakeLists.txt            |   3 +-
 src/iceberg/test/json_internal_test.cc     |  49 +++++++
 src/iceberg/test/update_statistics_test.cc | 221 +++++++++++++++++++++++++++++
 src/iceberg/transaction.cc                 |  19 +++
 src/iceberg/transaction.h                  |   4 +
 src/iceberg/type_fwd.h                     |   1 +
 src/iceberg/update/meson.build             |   1 +
 src/iceberg/update/pending_update.h        |   1 +
 src/iceberg/update/update_statistics.cc    |  72 ++++++++++
 src/iceberg/update/update_statistics.h     |  78 ++++++++++
 19 files changed, 648 insertions(+), 19 deletions(-)

diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index e3db640e..788b903f 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -95,6 +95,7 @@ set(ICEBERG_SOURCES
     update/update_properties.cc
     update/update_schema.cc
     update/update_sort_order.cc
+    update/update_statistics.cc
     util/bucket_util.cc
     util/content_file_util.cc
     util/conversions.cc
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index e92d3dd5..a614a4b9 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -190,6 +190,8 @@ constexpr std::string_view kActionSetSnapshotRef = 
"set-snapshot-ref";
 constexpr std::string_view kActionSetProperties = "set-properties";
 constexpr std::string_view kActionRemoveProperties = "remove-properties";
 constexpr std::string_view kActionSetLocation = "set-location";
+constexpr std::string_view kActionSetStatistics = "set-statistics";
+constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
 
 // TableUpdate field constants
 constexpr std::string_view kUUID = "uuid";
@@ -1423,6 +1425,22 @@ nlohmann::json ToJson(const TableUpdate& update) {
       json[kLocation] = u.location();
       break;
     }
+    case TableUpdate::Kind::kSetStatistics: {
+      const auto& u = internal::checked_cast<const 
table::SetStatistics&>(update);
+      json[kAction] = kActionSetStatistics;
+      if (u.statistics_file()) {
+        json[kStatistics] = ToJson(*u.statistics_file());
+      } else {
+        json[kStatistics] = nlohmann::json::value_t::null;
+      }
+      break;
+    }
+    case TableUpdate::Kind::kRemoveStatistics: {
+      const auto& u = internal::checked_cast<const 
table::RemoveStatistics&>(update);
+      json[kAction] = kActionRemoveStatistics;
+      json[kSnapshotId] = u.snapshot_id();
+      break;
+    }
   }
   return json;
 }
@@ -1601,6 +1619,17 @@ Result<std::unique_ptr<TableUpdate>> 
TableUpdateFromJson(const nlohmann::json& j
     ICEBERG_ASSIGN_OR_RAISE(auto location, GetJsonValue<std::string>(json, 
kLocation));
     return std::make_unique<table::SetLocation>(std::move(location));
   }
+  if (action == kActionSetStatistics) {
+    ICEBERG_ASSIGN_OR_RAISE(auto statistics_json,
+                            GetJsonValue<nlohmann::json>(json, kStatistics));
+    ICEBERG_ASSIGN_OR_RAISE(auto statistics_file,
+                            StatisticsFileFromJson(statistics_json));
+    return std::make_unique<table::SetStatistics>(std::move(statistics_file));
+  }
+  if (action == kActionRemoveStatistics) {
+    ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json, 
kSnapshotId));
+    return std::make_unique<table::RemoveStatistics>(snapshot_id);
+  }
 
   return JsonParseError("Unknown table update action: {}", action);
 }
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index c50aa1b2..a1f88b36 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -113,6 +113,7 @@ iceberg_sources = files(
     'update/update_properties.cc',
     'update/update_schema.cc',
     'update/update_sort_order.cc',
+    'update/update_statistics.cc',
     'util/bucket_util.cc',
     'util/content_file_util.cc',
     'util/conversions.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 28ee285a..73acafd7 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -35,6 +35,7 @@
 #include "iceberg/update/update_partition_spec.h"
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
+#include "iceberg/update/update_statistics.h"
 #include "iceberg/util/macros.h"
 
 namespace iceberg {
@@ -206,6 +207,13 @@ Result<std::shared_ptr<FastAppend>> Table::NewFastAppend() 
{
   return transaction->NewFastAppend();
 }
 
+Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto transaction, Transaction::Make(shared_from_this(), 
Transaction::Kind::kUpdate,
+                                          /*auto_commit=*/true));
+  return transaction->NewUpdateStatistics();
+}
+
 Result<std::shared_ptr<StagedTable>> StagedTable::Make(
     TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
     std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 75cad6e1..77e9016f 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -152,6 +152,10 @@ class ICEBERG_EXPORT Table : public 
std::enable_shared_from_this<Table> {
   /// changes.
   virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
 
+  /// \brief Create a new UpdateStatistics to update the table statistics and 
commit the
+  /// changes.
+  virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
+
   /// \brief Create a new UpdateLocation to update the table location and 
commit the
   /// changes.
   virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 7e357bba..393b438a 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -45,6 +45,7 @@
 #include "iceberg/schema.h"
 #include "iceberg/snapshot.h"
 #include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
 #include "iceberg/table_properties.h"
 #include "iceberg/table_update.h"
 #include "iceberg/util/checked_cast.h"
@@ -620,6 +621,8 @@ class TableMetadataBuilder::Impl {
   Status RemoveRef(const std::string& name);
   Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
   Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
+  Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
+  Status RemoveStatistics(int64_t snapshot_id);
 
   Result<std::unique_ptr<TableMetadata>> Build();
 
@@ -1173,6 +1176,38 @@ Status TableMetadataBuilder::Impl::SetRef(const 
std::string& name,
   return {};
 }
 
+Status TableMetadataBuilder::Impl::SetStatistics(
+    std::shared_ptr<StatisticsFile> statistics_file) {
+  ICEBERG_PRECHECK(statistics_file != nullptr, "Cannot set null statistics 
file");
+
+  // Find and replace existing statistics for the same snapshot_id, or add new 
one
+  auto it = std::ranges::find_if(
+      metadata_.statistics,
+      [snapshot_id = statistics_file->snapshot_id](const auto& stat) {
+        return stat && stat->snapshot_id == snapshot_id;
+      });
+
+  if (it != metadata_.statistics.end()) {
+    *it = statistics_file;
+  } else {
+    metadata_.statistics.push_back(statistics_file);
+  }
+
+  
changes_.push_back(std::make_unique<table::SetStatistics>(std::move(statistics_file)));
+  return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
+  auto removed_count =
+      std::erase_if(metadata_.statistics, [snapshot_id](const auto& stat) {
+        return stat && stat->snapshot_id == snapshot_id;
+      });
+  if (removed_count != 0) {
+    changes_.push_back(std::make_unique<table::RemoveStatistics>(snapshot_id));
+  }
+  return {};
+}
+
 std::unordered_set<int64_t> 
TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
     int64_t current_snapshot_id) const {
   std::unordered_set<int64_t> added_snapshot_ids;
@@ -1589,12 +1624,14 @@ TableMetadataBuilder& 
TableMetadataBuilder::SuppressHistoricalSnapshots() {
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetStatistics(
-    const std::shared_ptr<StatisticsFile>& statistics_file) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+    std::shared_ptr<StatisticsFile> statistics_file) {
+  
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetStatistics(std::move(statistics_file)));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t 
snapshot_id) {
-  throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+  ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveStatistics(snapshot_id));
+  return *this;
 }
 
 TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index 6631b9d2..cfae0ce2 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -398,8 +398,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public 
ErrorCollector {
   ///
   /// \param statistics_file The statistics file to set
   /// \return Reference to this builder for method chaining
-  TableMetadataBuilder& SetStatistics(
-      const std::shared_ptr<StatisticsFile>& statistics_file);
+  TableMetadataBuilder& SetStatistics(std::shared_ptr<StatisticsFile> 
statistics_file);
 
   /// \brief Remove table statistics by snapshot ID
   ///
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 7a01bdee..5866b61d 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -22,8 +22,10 @@
 #include "iceberg/exception.h"
 #include "iceberg/schema.h"
 #include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
 #include "iceberg/table_metadata.h"
 #include "iceberg/table_requirements.h"
+#include "iceberg/util/checked_cast.h"
 
 namespace iceberg {
 TableUpdate::~TableUpdate() = default;
@@ -45,7 +47,7 @@ bool AssignUUID::Equals(const TableUpdate& other) const {
   if (other.kind() != Kind::kAssignUUID) {
     return false;
   }
-  const auto& other_assign = static_cast<const AssignUUID&>(other);
+  const auto& other_assign = internal::checked_cast<const AssignUUID&>(other);
   return uuid_ == other_assign.uuid_;
 }
 
@@ -67,7 +69,7 @@ bool UpgradeFormatVersion::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kUpgradeFormatVersion) {
     return false;
   }
-  const auto& other_upgrade = static_cast<const UpgradeFormatVersion&>(other);
+  const auto& other_upgrade = internal::checked_cast<const 
UpgradeFormatVersion&>(other);
   return format_version_ == other_upgrade.format_version_;
 }
 
@@ -117,7 +119,7 @@ bool SetCurrentSchema::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kSetCurrentSchema) {
     return false;
   }
-  const auto& other_set = static_cast<const SetCurrentSchema&>(other);
+  const auto& other_set = internal::checked_cast<const 
SetCurrentSchema&>(other);
   return schema_id_ == other_set.schema_id_;
 }
 
@@ -167,7 +169,7 @@ bool SetDefaultPartitionSpec::Equals(const TableUpdate& 
other) const {
   if (other.kind() != Kind::kSetDefaultPartitionSpec) {
     return false;
   }
-  const auto& other_set = static_cast<const SetDefaultPartitionSpec&>(other);
+  const auto& other_set = internal::checked_cast<const 
SetDefaultPartitionSpec&>(other);
   return spec_id_ == other_set.spec_id_;
 }
 
@@ -190,7 +192,7 @@ bool RemovePartitionSpecs::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kRemovePartitionSpecs) {
     return false;
   }
-  const auto& other_remove = static_cast<const RemovePartitionSpecs&>(other);
+  const auto& other_remove = internal::checked_cast<const 
RemovePartitionSpecs&>(other);
   return spec_ids_ == other_remove.spec_ids_;
 }
 
@@ -213,7 +215,7 @@ bool RemoveSchemas::Equals(const TableUpdate& other) const {
   if (other.kind() != Kind::kRemoveSchemas) {
     return false;
   }
-  const auto& other_remove = static_cast<const RemoveSchemas&>(other);
+  const auto& other_remove = internal::checked_cast<const 
RemoveSchemas&>(other);
   return schema_ids_ == other_remove.schema_ids_;
 }
 
@@ -263,7 +265,7 @@ bool SetDefaultSortOrder::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kSetDefaultSortOrder) {
     return false;
   }
-  const auto& other_set = static_cast<const SetDefaultSortOrder&>(other);
+  const auto& other_set = internal::checked_cast<const 
SetDefaultSortOrder&>(other);
   return sort_order_id_ == other_set.sort_order_id_;
 }
 
@@ -313,7 +315,7 @@ bool RemoveSnapshots::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kRemoveSnapshots) {
     return false;
   }
-  const auto& other_remove = static_cast<const RemoveSnapshots&>(other);
+  const auto& other_remove = internal::checked_cast<const 
RemoveSnapshots&>(other);
   return snapshot_ids_ == other_remove.snapshot_ids_;
 }
 
@@ -335,7 +337,7 @@ bool RemoveSnapshotRef::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kRemoveSnapshotRef) {
     return false;
   }
-  const auto& other_remove = static_cast<const RemoveSnapshotRef&>(other);
+  const auto& other_remove = internal::checked_cast<const 
RemoveSnapshotRef&>(other);
   return ref_name_ == other_remove.ref_name_;
 }
 
@@ -366,7 +368,7 @@ bool SetSnapshotRef::Equals(const TableUpdate& other) const 
{
   if (other.kind() != Kind::kSetSnapshotRef) {
     return false;
   }
-  const auto& other_set = static_cast<const SetSnapshotRef&>(other);
+  const auto& other_set = internal::checked_cast<const SetSnapshotRef&>(other);
   return ref_name_ == other_set.ref_name_ && snapshot_id_ == 
other_set.snapshot_id_ &&
          type_ == other_set.type_ &&
          min_snapshots_to_keep_ == other_set.min_snapshots_to_keep_ &&
@@ -394,7 +396,7 @@ bool SetProperties::Equals(const TableUpdate& other) const {
   if (other.kind() != Kind::kSetProperties) {
     return false;
   }
-  const auto& other_set = static_cast<const SetProperties&>(other);
+  const auto& other_set = internal::checked_cast<const SetProperties&>(other);
   return updated_ == other_set.updated_;
 }
 
@@ -416,7 +418,7 @@ bool RemoveProperties::Equals(const TableUpdate& other) 
const {
   if (other.kind() != Kind::kRemoveProperties) {
     return false;
   }
-  const auto& other_remove = static_cast<const RemoveProperties&>(other);
+  const auto& other_remove = internal::checked_cast<const 
RemoveProperties&>(other);
   return removed_ == other_remove.removed_;
 }
 
@@ -438,7 +440,7 @@ bool SetLocation::Equals(const TableUpdate& other) const {
   if (other.kind() != Kind::kSetLocation) {
     return false;
   }
-  const auto& other_set = static_cast<const SetLocation&>(other);
+  const auto& other_set = internal::checked_cast<const SetLocation&>(other);
   return location_ == other_set.location_;
 }
 
@@ -446,4 +448,56 @@ std::unique_ptr<TableUpdate> SetLocation::Clone() const {
   return std::make_unique<SetLocation>(location_);
 }
 
+// SetStatistics
+
+int64_t SetStatistics::snapshot_id() const { return 
statistics_file_->snapshot_id; }
+
+void SetStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+  builder.SetStatistics(statistics_file_);
+}
+
+void SetStatistics::GenerateRequirements(TableUpdateContext& context) const {
+  // SetStatistics doesn't generate any requirements
+}
+
+bool SetStatistics::Equals(const TableUpdate& other) const {
+  if (other.kind() != Kind::kSetStatistics) {
+    return false;
+  }
+  const auto& other_set = internal::checked_cast<const SetStatistics&>(other);
+  if (!statistics_file_ != !other_set.statistics_file_) {
+    return false;
+  }
+  if (statistics_file_ && !(*statistics_file_ == *other_set.statistics_file_)) 
{
+    return false;
+  }
+  return true;
+}
+
+std::unique_ptr<TableUpdate> SetStatistics::Clone() const {
+  return std::make_unique<SetStatistics>(statistics_file_);
+}
+
+// RemoveStatistics
+
+void RemoveStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+  builder.RemoveStatistics(snapshot_id_);
+}
+
+void RemoveStatistics::GenerateRequirements(TableUpdateContext& context) const 
{
+  // RemoveStatistics doesn't generate any requirements
+}
+
+bool RemoveStatistics::Equals(const TableUpdate& other) const {
+  if (other.kind() != Kind::kRemoveStatistics) {
+    return false;
+  }
+  const auto& other_remove = internal::checked_cast<const 
RemoveStatistics&>(other);
+  return snapshot_id_ == other_remove.snapshot_id_;
+}
+
+std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
+  return std::make_unique<RemoveStatistics>(snapshot_id_);
+}
+
 }  // namespace iceberg::table
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 3c9c9dbb..5bbc243e 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -59,6 +59,8 @@ class ICEBERG_EXPORT TableUpdate {
     kSetProperties,
     kRemoveProperties,
     kSetLocation,
+    kSetStatistics,
+    kRemoveStatistics,
   };
 
   virtual ~TableUpdate();
@@ -509,6 +511,53 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate {
   std::string location_;
 };
 
+/// \brief Represents setting statistics for a snapshot
+class ICEBERG_EXPORT SetStatistics : public TableUpdate {
+ public:
+  explicit SetStatistics(std::shared_ptr<StatisticsFile> statistics_file)
+      : statistics_file_(std::move(statistics_file)) {}
+
+  int64_t snapshot_id() const;
+
+  const std::shared_ptr<StatisticsFile>& statistics_file() const {
+    return statistics_file_;
+  }
+
+  void ApplyTo(TableMetadataBuilder& builder) const override;
+
+  void GenerateRequirements(TableUpdateContext& context) const override;
+
+  Kind kind() const override { return Kind::kSetStatistics; }
+
+  bool Equals(const TableUpdate& other) const override;
+
+  std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+  std::shared_ptr<StatisticsFile> statistics_file_;
+};
+
+/// \brief Represents removing statistics for a snapshot
+class ICEBERG_EXPORT RemoveStatistics : public TableUpdate {
+ public:
+  explicit RemoveStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {}
+
+  int64_t snapshot_id() const { return snapshot_id_; }
+
+  void ApplyTo(TableMetadataBuilder& builder) const override;
+
+  void GenerateRequirements(TableUpdateContext& context) const override;
+
+  Kind kind() const override { return Kind::kRemoveStatistics; }
+
+  bool Equals(const TableUpdate& other) const override;
+
+  std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+  int64_t snapshot_id_;
+};
+
 }  // namespace table
 
 }  // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 62864cd6..1bd2fd6a 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -180,7 +180,8 @@ if(ICEBERG_BUILD_BUNDLE)
                    update_partition_spec_test.cc
                    update_properties_test.cc
                    update_schema_test.cc
-                   update_sort_order_test.cc)
+                   update_sort_order_test.cc
+                   update_statistics_test.cc)
 
   add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
 
diff --git a/src/iceberg/test/json_internal_test.cc 
b/src/iceberg/test/json_internal_test.cc
index d6a171e0..bb167ad0 100644
--- a/src/iceberg/test/json_internal_test.cc
+++ b/src/iceberg/test/json_internal_test.cc
@@ -31,6 +31,7 @@
 #include "iceberg/snapshot.h"
 #include "iceberg/sort_field.h"
 #include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
 #include "iceberg/table_requirement.h"
 #include "iceberg/table_update.h"
 #include "iceberg/test/matchers.h"
@@ -564,6 +565,54 @@ TEST(JsonInternalTest, TableUpdateSetLocation) {
   
EXPECT_EQ(*internal::checked_cast<table::SetLocation*>(parsed.value().get()), 
update);
 }
 
+TEST(JsonInternalTest, TableUpdateSetStatistics) {
+  auto stats_file = std::make_shared<StatisticsFile>();
+  stats_file->snapshot_id = 123456789;
+  stats_file->path = 
"s3://bucket/warehouse/table/metadata/stats-123456789.puffin";
+  stats_file->file_size_in_bytes = 1024;
+  stats_file->file_footer_size_in_bytes = 128;
+  stats_file->blob_metadata = {BlobMetadata{.type = "ndv",
+                                            .source_snapshot_id = 123456789,
+                                            .source_snapshot_sequence_number = 
1,
+                                            .fields = {1, 2},
+                                            .properties = {{"prop1", 
"value1"}}}};
+
+  table::SetStatistics update(stats_file);
+  nlohmann::json expected = R"({
+    "action": "set-statistics",
+    "statistics": {
+      "snapshot-id": 123456789,
+      "statistics-path": 
"s3://bucket/warehouse/table/metadata/stats-123456789.puffin",
+      "file-size-in-bytes": 1024,
+      "file-footer-size-in-bytes": 128,
+      "blob-metadata": [{
+        "type": "ndv",
+        "snapshot-id": 123456789,
+        "sequence-number": 1,
+        "fields": [1, 2],
+        "properties": {"prop1": "value1"}
+      }]
+    }
+  })"_json;
+
+  EXPECT_EQ(ToJson(update), expected);
+  auto parsed = TableUpdateFromJson(expected);
+  ASSERT_THAT(parsed, IsOk());
+  
EXPECT_EQ(*internal::checked_cast<table::SetStatistics*>(parsed.value().get()), 
update);
+}
+
+TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
+  table::RemoveStatistics update(123456789);
+  nlohmann::json expected =
+      R"({"action":"remove-statistics","snapshot-id":123456789})"_json;
+
+  EXPECT_EQ(ToJson(update), expected);
+  auto parsed = TableUpdateFromJson(expected);
+  ASSERT_THAT(parsed, IsOk());
+  
EXPECT_EQ(*internal::checked_cast<table::RemoveStatistics*>(parsed.value().get()),
+            update);
+}
+
 TEST(JsonInternalTest, TableUpdateUnknownAction) {
   nlohmann::json json = R"({"action":"unknown-action"})"_json;
   auto result = TableUpdateFromJson(json);
diff --git a/src/iceberg/test/update_statistics_test.cc 
b/src/iceberg/test/update_statistics_test.cc
new file mode 100644
index 00000000..7b9d4f58
--- /dev/null
+++ b/src/iceberg/test/update_statistics_test.cc
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_statistics.h"
+
+#include <algorithm>
+#include <memory>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+
+namespace iceberg {
+
+class UpdateStatisticsTest : public UpdateTestBase {
+ protected:
+  // Helper function to create a statistics file
+  std::shared_ptr<StatisticsFile> MakeStatisticsFile(int64_t snapshot_id,
+                                                     const std::string& path,
+                                                     int64_t file_size = 1024,
+                                                     int64_t footer_size = 
128) {
+    auto stats_file = std::make_shared<StatisticsFile>();
+    stats_file->snapshot_id = snapshot_id;
+    stats_file->path = path;
+    stats_file->file_size_in_bytes = file_size;
+    stats_file->file_footer_size_in_bytes = footer_size;
+
+    BlobMetadata blob;
+    blob.type = "apache-datasketches-theta-v1";
+    blob.source_snapshot_id = snapshot_id;
+    blob.source_snapshot_sequence_number = 1;
+    blob.fields = {1, 2};
+    blob.properties = {{"ndv", "100"}};
+    stats_file->blob_metadata.push_back(blob);
+
+    return stats_file;
+  }
+
+  // Helper to find statistics file by snapshot_id in the result vector
+  std::shared_ptr<StatisticsFile> FindStatistics(
+      const std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>>& 
to_set,
+      int64_t snapshot_id) {
+    auto it = std::ranges::find_if(
+        to_set, [snapshot_id](const auto& p) { return p.first == snapshot_id; 
});
+    return it != to_set.end() ? it->second : nullptr;
+  }
+};
+
+TEST_F(UpdateStatisticsTest, EmptyUpdate) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_TRUE(result.to_remove.empty());
+}
+
+TEST_F(UpdateStatisticsTest, SetStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  auto stats_file =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+  update->SetStatistics(stats_file);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 1);
+  EXPECT_TRUE(result.to_remove.empty());
+  EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file);
+}
+
+TEST_F(UpdateStatisticsTest, SetMultipleStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  auto stats_file_1 =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+  auto stats_file_2 =
+      MakeStatisticsFile(2, "/warehouse/test_table/metadata/stats-2.puffin");
+
+  update->SetStatistics(stats_file_1).SetStatistics(stats_file_2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 2);
+  EXPECT_TRUE(result.to_remove.empty());
+  EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file_1);
+  EXPECT_EQ(FindStatistics(result.to_set, 2), stats_file_2);
+}
+
+TEST_F(UpdateStatisticsTest, RemoveStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  update->RemoveStatistics(1);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_EQ(result.to_remove.size(), 1);
+  EXPECT_THAT(result.to_remove, ::testing::Contains(1));
+}
+
+TEST_F(UpdateStatisticsTest, RemoveMultipleStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  update->RemoveStatistics(1).RemoveStatistics(2).RemoveStatistics(3);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_EQ(result.to_remove.size(), 3);
+  EXPECT_THAT(result.to_remove, ::testing::UnorderedElementsAre(1, 2, 3));
+}
+
+TEST_F(UpdateStatisticsTest, SetAndRemoveDifferentSnapshots) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  auto stats_file =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+
+  update->SetStatistics(stats_file).RemoveStatistics(2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 1);
+  EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file);
+  EXPECT_EQ(result.to_remove.size(), 1);
+  EXPECT_THAT(result.to_remove, ::testing::Contains(2));
+}
+
+TEST_F(UpdateStatisticsTest, ReplaceStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  auto stats_file_1 =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1a.puffin");
+  auto stats_file_2 =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1b.puffin", 
2048, 256);
+
+  // Set statistics for snapshot 1, then replace it
+  update->SetStatistics(stats_file_1).SetStatistics(stats_file_2);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 1);
+  EXPECT_TRUE(result.to_remove.empty());
+  // Should have the second one (replacement)
+  EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file_2);
+  EXPECT_NE(FindStatistics(result.to_set, 1), stats_file_1);
+}
+
+TEST_F(UpdateStatisticsTest, SetThenRemoveSameSnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  auto stats_file =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+
+  // Set statistics for snapshot 1, then remove it
+  update->SetStatistics(stats_file).RemoveStatistics(1);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_TRUE(result.to_set.empty());
+  EXPECT_EQ(result.to_remove.size(), 1);
+  EXPECT_THAT(result.to_remove, ::testing::Contains(1));
+}
+
+TEST_F(UpdateStatisticsTest, RemoveThenSetSameSnapshot) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+  auto stats_file =
+      MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+
+  // Remove statistics for snapshot 1, then set new ones
+  update->RemoveStatistics(1).SetStatistics(stats_file);
+
+  ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+  EXPECT_EQ(result.to_set.size(), 1);
+  EXPECT_TRUE(result.to_remove.empty());
+  EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file);
+}
+
+TEST_F(UpdateStatisticsTest, SetNullStatistics) {
+  ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+
+  update->SetStatistics(nullptr);
+
+  auto result = update->Apply();
+  EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+  EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null"));
+}
+
+TEST_F(UpdateStatisticsTest, CommitSuccess) {
+  // Test empty commit
+  ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateStatistics());
+  EXPECT_THAT(empty_update->Commit(), IsOk());
+
+  // Reload table after first commit
+  ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+
+  // Get a valid snapshot ID from the table
+  ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
+  int64_t snapshot_id = current_snapshot->snapshot_id;
+
+  // Test commit with statistics changes
+  ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateStatistics());
+  auto stats_file =
+      MakeStatisticsFile(snapshot_id, 
"/warehouse/test_table/metadata/stats-1.puffin");
+  update->SetStatistics(stats_file);
+
+  EXPECT_THAT(update->Commit(), IsOk());
+
+  // Verify the statistics were committed and persisted
+  ICEBERG_UNWRAP_OR_FAIL(auto final_table, catalog_->LoadTable(table_ident_));
+  const auto& statistics = final_table->metadata()->statistics;
+  EXPECT_EQ(statistics.size(), 1);
+  EXPECT_EQ(*statistics[0], *stats_file);
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 0f950b02..7bd4a577 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -25,6 +25,7 @@
 #include "iceberg/catalog.h"
 #include "iceberg/schema.h"
 #include "iceberg/snapshot.h"
+#include "iceberg/statistics_file.h"
 #include "iceberg/table.h"
 #include "iceberg/table_metadata.h"
 #include "iceberg/table_properties.h"
@@ -41,6 +42,7 @@
 #include "iceberg/update/update_properties.h"
 #include "iceberg/update/update_schema.h"
 #include "iceberg/update/update_sort_order.h"
+#include "iceberg/update/update_statistics.h"
 #include "iceberg/util/checked_cast.h"
 #include "iceberg/util/location_util.h"
 #include "iceberg/util/macros.h"
@@ -198,6 +200,16 @@ Status Transaction::Apply(PendingUpdate& update) {
         metadata_builder_->AssignUUID();
       }
     } break;
+    case PendingUpdate::Kind::kUpdateStatistics: {
+      auto& update_statistics = 
internal::checked_cast<UpdateStatistics&>(update);
+      ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
+      for (auto&& [_, stat_file] : result.to_set) {
+        metadata_builder_->SetStatistics(std::move(stat_file));
+      }
+      for (const auto& snapshot_id : result.to_remove) {
+        metadata_builder_->RemoveStatistics(snapshot_id);
+      }
+    } break;
     default:
       return NotSupported("Unsupported pending update: {}",
                           static_cast<int32_t>(update.kind()));
@@ -316,4 +328,11 @@ Result<std::shared_ptr<FastAppend>> 
Transaction::NewFastAppend() {
   return fast_append;
 }
 
+Result<std::shared_ptr<UpdateStatistics>> Transaction::NewUpdateStatistics() {
+  ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateStatistics> update_statistics,
+                          UpdateStatistics::Make(shared_from_this()));
+  ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics));
+  return update_statistics;
+}
+
 }  // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 23b01fb1..6d7816ee 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public 
std::enable_shared_from_this<Transacti
   /// changes.
   Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
 
+  /// \brief Create a new UpdateStatistics to update table statistics and 
commit the
+  /// changes.
+  Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
+
   /// \brief Create a new UpdateLocation to update the table location and 
commit the
   /// changes.
   Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 40736419..2e099c1b 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -199,6 +199,7 @@ class UpdatePartitionSpec;
 class UpdateProperties;
 class UpdateSchema;
 class UpdateSortOrder;
+class UpdateStatistics;
 
 /// 
----------------------------------------------------------------------------
 /// TODO: Forward declarations below are not added yet.
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 2bbd1037..8dc92c00 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -27,6 +27,7 @@ install_headers(
         'update_schema.h',
         'update_sort_order.h',
         'update_properties.h',
+        'update_statistics.h',
     ],
     subdir: 'iceberg/update',
 )
diff --git a/src/iceberg/update/pending_update.h 
b/src/iceberg/update/pending_update.h
index 68c2d205..0c0b6e3e 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -50,6 +50,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
     kUpdateSchema,
     kUpdateSnapshot,
     kUpdateSortOrder,
+    kUpdateStatistics,
   };
 
   /// \brief Return the kind of this pending update.
diff --git a/src/iceberg/update/update_statistics.cc 
b/src/iceberg/update/update_statistics.cc
new file mode 100644
index 00000000..46145336
--- /dev/null
+++ b/src/iceberg/update/update_statistics.cc
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_statistics.h"
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<UpdateStatistics>> UpdateStatistics::Make(
+    std::shared_ptr<Transaction> transaction) {
+  ICEBERG_PRECHECK(transaction != nullptr,
+                   "Cannot create UpdateStatistics without a transaction");
+  return std::shared_ptr<UpdateStatistics>(new 
UpdateStatistics(std::move(transaction)));
+}
+
+UpdateStatistics::UpdateStatistics(std::shared_ptr<Transaction> transaction)
+    : PendingUpdate(std::move(transaction)) {}
+
+UpdateStatistics::~UpdateStatistics() = default;
+
+UpdateStatistics& UpdateStatistics::SetStatistics(
+    std::shared_ptr<StatisticsFile> statistics_file) {
+  ICEBERG_BUILDER_CHECK(statistics_file != nullptr, "Statistics file cannot be 
null");
+  statistics_to_set_[statistics_file->snapshot_id] = 
std::move(statistics_file);
+  return *this;
+}
+
+UpdateStatistics& UpdateStatistics::RemoveStatistics(int64_t snapshot_id) {
+  statistics_to_set_[snapshot_id] = nullptr;
+  return *this;
+}
+
+Result<UpdateStatistics::ApplyResult> UpdateStatistics::Apply() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+  ApplyResult result;
+  for (const auto& [snapshot_id, stats] : statistics_to_set_) {
+    if (stats) {
+      result.to_set.emplace_back(snapshot_id, stats);
+    } else {
+      result.to_remove.push_back(snapshot_id);
+    }
+  }
+  return result;
+}
+
+}  // namespace iceberg
diff --git a/src/iceberg/update/update_statistics.h 
b/src/iceberg/update/update_statistics.h
new file mode 100644
index 00000000..55e50fb1
--- /dev/null
+++ b/src/iceberg/update/update_statistics.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/update_statistics.h
+/// \brief Updates table statistics.
+
+namespace iceberg {
+
+/// \brief Updates table statistics.
+class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
+ public:
+  static Result<std::shared_ptr<UpdateStatistics>> Make(
+      std::shared_ptr<Transaction> transaction);
+
+  ~UpdateStatistics() override;
+
+  /// \brief Set statistics file for a snapshot.
+  ///
+  /// Associates a statistics file with a snapshot ID. If statistics already 
exist
+  /// for this snapshot, they will be replaced.
+  ///
+  /// \param statistics_file The statistics file to set
+  /// \return Reference to this UpdateStatistics for chaining
+  UpdateStatistics& SetStatistics(std::shared_ptr<StatisticsFile> 
statistics_file);
+
+  /// \brief Remove statistics for a snapshot.
+  ///
+  /// Marks the statistics for the given snapshot ID for removal.
+  ///
+  /// \param snapshot_id The snapshot ID whose statistics to remove
+  /// \return Reference to this UpdateStatistics for chaining
+  UpdateStatistics& RemoveStatistics(int64_t snapshot_id);
+
+  Kind kind() const final { return Kind::kUpdateStatistics; }
+
+  struct ApplyResult {
+    std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>> to_set;
+    std::vector<int64_t> to_remove;
+  };
+
+  Result<ApplyResult> Apply();
+
+ private:
+  explicit UpdateStatistics(std::shared_ptr<Transaction> transaction);
+
+  std::unordered_map<int64_t, std::shared_ptr<StatisticsFile>> 
statistics_to_set_;
+};
+
+}  // namespace iceberg


Reply via email to