This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new f9f14493 feat: add update table stats (#511)
f9f14493 is described below
commit f9f1449306a2857bf8fbf0f8c1d1c3d6bc1b1700
Author: Feiyang Li <[email protected]>
AuthorDate: Wed Jan 21 17:57:55 2026 +0800
feat: add update table stats (#511)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/json_internal.cc | 29 ++++
src/iceberg/meson.build | 1 +
src/iceberg/table.cc | 8 ++
src/iceberg/table.h | 4 +
src/iceberg/table_metadata.cc | 43 +++++-
src/iceberg/table_metadata.h | 3 +-
src/iceberg/table_update.cc | 80 +++++++++--
src/iceberg/table_update.h | 49 +++++++
src/iceberg/test/CMakeLists.txt | 3 +-
src/iceberg/test/json_internal_test.cc | 49 +++++++
src/iceberg/test/update_statistics_test.cc | 221 +++++++++++++++++++++++++++++
src/iceberg/transaction.cc | 19 +++
src/iceberg/transaction.h | 4 +
src/iceberg/type_fwd.h | 1 +
src/iceberg/update/meson.build | 1 +
src/iceberg/update/pending_update.h | 1 +
src/iceberg/update/update_statistics.cc | 72 ++++++++++
src/iceberg/update/update_statistics.h | 78 ++++++++++
19 files changed, 648 insertions(+), 19 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index e3db640e..788b903f 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -95,6 +95,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
+ update/update_statistics.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc
index e92d3dd5..a614a4b9 100644
--- a/src/iceberg/json_internal.cc
+++ b/src/iceberg/json_internal.cc
@@ -190,6 +190,8 @@ constexpr std::string_view kActionSetSnapshotRef =
"set-snapshot-ref";
constexpr std::string_view kActionSetProperties = "set-properties";
constexpr std::string_view kActionRemoveProperties = "remove-properties";
constexpr std::string_view kActionSetLocation = "set-location";
+constexpr std::string_view kActionSetStatistics = "set-statistics";
+constexpr std::string_view kActionRemoveStatistics = "remove-statistics";
// TableUpdate field constants
constexpr std::string_view kUUID = "uuid";
@@ -1423,6 +1425,22 @@ nlohmann::json ToJson(const TableUpdate& update) {
json[kLocation] = u.location();
break;
}
+ case TableUpdate::Kind::kSetStatistics: {
+ const auto& u = internal::checked_cast<const
table::SetStatistics&>(update);
+ json[kAction] = kActionSetStatistics;
+ if (u.statistics_file()) {
+ json[kStatistics] = ToJson(*u.statistics_file());
+ } else {
+ json[kStatistics] = nlohmann::json::value_t::null;
+ }
+ break;
+ }
+ case TableUpdate::Kind::kRemoveStatistics: {
+ const auto& u = internal::checked_cast<const
table::RemoveStatistics&>(update);
+ json[kAction] = kActionRemoveStatistics;
+ json[kSnapshotId] = u.snapshot_id();
+ break;
+ }
}
return json;
}
@@ -1601,6 +1619,17 @@ Result<std::unique_ptr<TableUpdate>>
TableUpdateFromJson(const nlohmann::json& j
ICEBERG_ASSIGN_OR_RAISE(auto location, GetJsonValue<std::string>(json,
kLocation));
return std::make_unique<table::SetLocation>(std::move(location));
}
+ if (action == kActionSetStatistics) {
+ ICEBERG_ASSIGN_OR_RAISE(auto statistics_json,
+ GetJsonValue<nlohmann::json>(json, kStatistics));
+ ICEBERG_ASSIGN_OR_RAISE(auto statistics_file,
+ StatisticsFileFromJson(statistics_json));
+ return std::make_unique<table::SetStatistics>(std::move(statistics_file));
+ }
+ if (action == kActionRemoveStatistics) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, GetJsonValue<int64_t>(json,
kSnapshotId));
+ return std::make_unique<table::RemoveStatistics>(snapshot_id);
+ }
return JsonParseError("Unknown table update action: {}", action);
}
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index c50aa1b2..a1f88b36 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -113,6 +113,7 @@ iceberg_sources = files(
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_sort_order.cc',
+ 'update/update_statistics.cc',
'util/bucket_util.cc',
'util/content_file_util.cc',
'util/conversions.cc',
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index 28ee285a..73acafd7 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -35,6 +35,7 @@
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
+#include "iceberg/update/update_statistics.h"
#include "iceberg/util/macros.h"
namespace iceberg {
@@ -206,6 +207,13 @@ Result<std::shared_ptr<FastAppend>> Table::NewFastAppend()
{
return transaction->NewFastAppend();
}
+Result<std::shared_ptr<UpdateStatistics>> Table::NewUpdateStatistics() {
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto transaction, Transaction::Make(shared_from_this(),
Transaction::Kind::kUpdate,
+ /*auto_commit=*/true));
+ return transaction->NewUpdateStatistics();
+}
+
Result<std::shared_ptr<StagedTable>> StagedTable::Make(
TableIdentifier identifier, std::shared_ptr<TableMetadata> metadata,
std::string metadata_location, std::shared_ptr<FileIO> io,
diff --git a/src/iceberg/table.h b/src/iceberg/table.h
index 75cad6e1..77e9016f 100644
--- a/src/iceberg/table.h
+++ b/src/iceberg/table.h
@@ -152,6 +152,10 @@ class ICEBERG_EXPORT Table : public
std::enable_shared_from_this<Table> {
/// changes.
virtual Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
+ /// \brief Create a new UpdateStatistics to update the table statistics and
commit the
+ /// changes.
+ virtual Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
+
/// \brief Create a new UpdateLocation to update the table location and
commit the
/// changes.
virtual Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 7e357bba..393b438a 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -45,6 +45,7 @@
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
#include "iceberg/table_properties.h"
#include "iceberg/table_update.h"
#include "iceberg/util/checked_cast.h"
@@ -620,6 +621,8 @@ class TableMetadataBuilder::Impl {
Status RemoveRef(const std::string& name);
Status RemoveSnapshots(const std::vector<int64_t>& snapshot_ids);
Status RemovePartitionSpecs(const std::vector<int32_t>& spec_ids);
+ Status SetStatistics(std::shared_ptr<StatisticsFile> statistics_file);
+ Status RemoveStatistics(int64_t snapshot_id);
Result<std::unique_ptr<TableMetadata>> Build();
@@ -1173,6 +1176,38 @@ Status TableMetadataBuilder::Impl::SetRef(const
std::string& name,
return {};
}
+Status TableMetadataBuilder::Impl::SetStatistics(
+ std::shared_ptr<StatisticsFile> statistics_file) {
+ ICEBERG_PRECHECK(statistics_file != nullptr, "Cannot set null statistics
file");
+
+ // Find and replace existing statistics for the same snapshot_id, or add new
one
+ auto it = std::ranges::find_if(
+ metadata_.statistics,
+ [snapshot_id = statistics_file->snapshot_id](const auto& stat) {
+ return stat && stat->snapshot_id == snapshot_id;
+ });
+
+ if (it != metadata_.statistics.end()) {
+ *it = statistics_file;
+ } else {
+ metadata_.statistics.push_back(statistics_file);
+ }
+
+
changes_.push_back(std::make_unique<table::SetStatistics>(std::move(statistics_file)));
+ return {};
+}
+
+Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) {
+ auto removed_count =
+ std::erase_if(metadata_.statistics, [snapshot_id](const auto& stat) {
+ return stat && stat->snapshot_id == snapshot_id;
+ });
+ if (removed_count != 0) {
+ changes_.push_back(std::make_unique<table::RemoveStatistics>(snapshot_id));
+ }
+ return {};
+}
+
std::unordered_set<int64_t>
TableMetadataBuilder::Impl::IntermediateSnapshotIdSet(
int64_t current_snapshot_id) const {
std::unordered_set<int64_t> added_snapshot_ids;
@@ -1589,12 +1624,14 @@ TableMetadataBuilder&
TableMetadataBuilder::SuppressHistoricalSnapshots() {
}
TableMetadataBuilder& TableMetadataBuilder::SetStatistics(
- const std::shared_ptr<StatisticsFile>& statistics_file) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ std::shared_ptr<StatisticsFile> statistics_file) {
+
ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetStatistics(std::move(statistics_file)));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t
snapshot_id) {
- throw IcebergError(std::format("{} not implemented", __FUNCTION__));
+ ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveStatistics(snapshot_id));
+ return *this;
}
TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics(
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index 6631b9d2..cfae0ce2 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -398,8 +398,7 @@ class ICEBERG_EXPORT TableMetadataBuilder : public
ErrorCollector {
///
/// \param statistics_file The statistics file to set
/// \return Reference to this builder for method chaining
- TableMetadataBuilder& SetStatistics(
- const std::shared_ptr<StatisticsFile>& statistics_file);
+ TableMetadataBuilder& SetStatistics(std::shared_ptr<StatisticsFile>
statistics_file);
/// \brief Remove table statistics by snapshot ID
///
diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc
index 7a01bdee..5866b61d 100644
--- a/src/iceberg/table_update.cc
+++ b/src/iceberg/table_update.cc
@@ -22,8 +22,10 @@
#include "iceberg/exception.h"
#include "iceberg/schema.h"
#include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_requirements.h"
+#include "iceberg/util/checked_cast.h"
namespace iceberg {
TableUpdate::~TableUpdate() = default;
@@ -45,7 +47,7 @@ bool AssignUUID::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kAssignUUID) {
return false;
}
- const auto& other_assign = static_cast<const AssignUUID&>(other);
+ const auto& other_assign = internal::checked_cast<const AssignUUID&>(other);
return uuid_ == other_assign.uuid_;
}
@@ -67,7 +69,7 @@ bool UpgradeFormatVersion::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kUpgradeFormatVersion) {
return false;
}
- const auto& other_upgrade = static_cast<const UpgradeFormatVersion&>(other);
+ const auto& other_upgrade = internal::checked_cast<const
UpgradeFormatVersion&>(other);
return format_version_ == other_upgrade.format_version_;
}
@@ -117,7 +119,7 @@ bool SetCurrentSchema::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kSetCurrentSchema) {
return false;
}
- const auto& other_set = static_cast<const SetCurrentSchema&>(other);
+ const auto& other_set = internal::checked_cast<const
SetCurrentSchema&>(other);
return schema_id_ == other_set.schema_id_;
}
@@ -167,7 +169,7 @@ bool SetDefaultPartitionSpec::Equals(const TableUpdate&
other) const {
if (other.kind() != Kind::kSetDefaultPartitionSpec) {
return false;
}
- const auto& other_set = static_cast<const SetDefaultPartitionSpec&>(other);
+ const auto& other_set = internal::checked_cast<const
SetDefaultPartitionSpec&>(other);
return spec_id_ == other_set.spec_id_;
}
@@ -190,7 +192,7 @@ bool RemovePartitionSpecs::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kRemovePartitionSpecs) {
return false;
}
- const auto& other_remove = static_cast<const RemovePartitionSpecs&>(other);
+ const auto& other_remove = internal::checked_cast<const
RemovePartitionSpecs&>(other);
return spec_ids_ == other_remove.spec_ids_;
}
@@ -213,7 +215,7 @@ bool RemoveSchemas::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kRemoveSchemas) {
return false;
}
- const auto& other_remove = static_cast<const RemoveSchemas&>(other);
+ const auto& other_remove = internal::checked_cast<const
RemoveSchemas&>(other);
return schema_ids_ == other_remove.schema_ids_;
}
@@ -263,7 +265,7 @@ bool SetDefaultSortOrder::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kSetDefaultSortOrder) {
return false;
}
- const auto& other_set = static_cast<const SetDefaultSortOrder&>(other);
+ const auto& other_set = internal::checked_cast<const
SetDefaultSortOrder&>(other);
return sort_order_id_ == other_set.sort_order_id_;
}
@@ -313,7 +315,7 @@ bool RemoveSnapshots::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kRemoveSnapshots) {
return false;
}
- const auto& other_remove = static_cast<const RemoveSnapshots&>(other);
+ const auto& other_remove = internal::checked_cast<const
RemoveSnapshots&>(other);
return snapshot_ids_ == other_remove.snapshot_ids_;
}
@@ -335,7 +337,7 @@ bool RemoveSnapshotRef::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kRemoveSnapshotRef) {
return false;
}
- const auto& other_remove = static_cast<const RemoveSnapshotRef&>(other);
+ const auto& other_remove = internal::checked_cast<const
RemoveSnapshotRef&>(other);
return ref_name_ == other_remove.ref_name_;
}
@@ -366,7 +368,7 @@ bool SetSnapshotRef::Equals(const TableUpdate& other) const
{
if (other.kind() != Kind::kSetSnapshotRef) {
return false;
}
- const auto& other_set = static_cast<const SetSnapshotRef&>(other);
+ const auto& other_set = internal::checked_cast<const SetSnapshotRef&>(other);
return ref_name_ == other_set.ref_name_ && snapshot_id_ ==
other_set.snapshot_id_ &&
type_ == other_set.type_ &&
min_snapshots_to_keep_ == other_set.min_snapshots_to_keep_ &&
@@ -394,7 +396,7 @@ bool SetProperties::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetProperties) {
return false;
}
- const auto& other_set = static_cast<const SetProperties&>(other);
+ const auto& other_set = internal::checked_cast<const SetProperties&>(other);
return updated_ == other_set.updated_;
}
@@ -416,7 +418,7 @@ bool RemoveProperties::Equals(const TableUpdate& other)
const {
if (other.kind() != Kind::kRemoveProperties) {
return false;
}
- const auto& other_remove = static_cast<const RemoveProperties&>(other);
+ const auto& other_remove = internal::checked_cast<const
RemoveProperties&>(other);
return removed_ == other_remove.removed_;
}
@@ -438,7 +440,7 @@ bool SetLocation::Equals(const TableUpdate& other) const {
if (other.kind() != Kind::kSetLocation) {
return false;
}
- const auto& other_set = static_cast<const SetLocation&>(other);
+ const auto& other_set = internal::checked_cast<const SetLocation&>(other);
return location_ == other_set.location_;
}
@@ -446,4 +448,56 @@ std::unique_ptr<TableUpdate> SetLocation::Clone() const {
return std::make_unique<SetLocation>(location_);
}
+// SetStatistics
+
+int64_t SetStatistics::snapshot_id() const { return
statistics_file_->snapshot_id; }
+
+void SetStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+ builder.SetStatistics(statistics_file_);
+}
+
+void SetStatistics::GenerateRequirements(TableUpdateContext& context) const {
+ // SetStatistics doesn't generate any requirements
+}
+
+bool SetStatistics::Equals(const TableUpdate& other) const {
+ if (other.kind() != Kind::kSetStatistics) {
+ return false;
+ }
+ const auto& other_set = internal::checked_cast<const SetStatistics&>(other);
+ if (!statistics_file_ != !other_set.statistics_file_) {
+ return false;
+ }
+ if (statistics_file_ && !(*statistics_file_ == *other_set.statistics_file_))
{
+ return false;
+ }
+ return true;
+}
+
+std::unique_ptr<TableUpdate> SetStatistics::Clone() const {
+ return std::make_unique<SetStatistics>(statistics_file_);
+}
+
+// RemoveStatistics
+
+void RemoveStatistics::ApplyTo(TableMetadataBuilder& builder) const {
+ builder.RemoveStatistics(snapshot_id_);
+}
+
+void RemoveStatistics::GenerateRequirements(TableUpdateContext& context) const
{
+ // RemoveStatistics doesn't generate any requirements
+}
+
+bool RemoveStatistics::Equals(const TableUpdate& other) const {
+ if (other.kind() != Kind::kRemoveStatistics) {
+ return false;
+ }
+ const auto& other_remove = internal::checked_cast<const
RemoveStatistics&>(other);
+ return snapshot_id_ == other_remove.snapshot_id_;
+}
+
+std::unique_ptr<TableUpdate> RemoveStatistics::Clone() const {
+ return std::make_unique<RemoveStatistics>(snapshot_id_);
+}
+
} // namespace iceberg::table
diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h
index 3c9c9dbb..5bbc243e 100644
--- a/src/iceberg/table_update.h
+++ b/src/iceberg/table_update.h
@@ -59,6 +59,8 @@ class ICEBERG_EXPORT TableUpdate {
kSetProperties,
kRemoveProperties,
kSetLocation,
+ kSetStatistics,
+ kRemoveStatistics,
};
virtual ~TableUpdate();
@@ -509,6 +511,53 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate {
std::string location_;
};
+/// \brief Represents setting statistics for a snapshot
+class ICEBERG_EXPORT SetStatistics : public TableUpdate {
+ public:
+ explicit SetStatistics(std::shared_ptr<StatisticsFile> statistics_file)
+ : statistics_file_(std::move(statistics_file)) {}
+
+ int64_t snapshot_id() const;
+
+ const std::shared_ptr<StatisticsFile>& statistics_file() const {
+ return statistics_file_;
+ }
+
+ void ApplyTo(TableMetadataBuilder& builder) const override;
+
+ void GenerateRequirements(TableUpdateContext& context) const override;
+
+ Kind kind() const override { return Kind::kSetStatistics; }
+
+ bool Equals(const TableUpdate& other) const override;
+
+ std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+ std::shared_ptr<StatisticsFile> statistics_file_;
+};
+
+/// \brief Represents removing statistics for a snapshot
+class ICEBERG_EXPORT RemoveStatistics : public TableUpdate {
+ public:
+ explicit RemoveStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {}
+
+ int64_t snapshot_id() const { return snapshot_id_; }
+
+ void ApplyTo(TableMetadataBuilder& builder) const override;
+
+ void GenerateRequirements(TableUpdateContext& context) const override;
+
+ Kind kind() const override { return Kind::kRemoveStatistics; }
+
+ bool Equals(const TableUpdate& other) const override;
+
+ std::unique_ptr<TableUpdate> Clone() const override;
+
+ private:
+ int64_t snapshot_id_;
+};
+
} // namespace table
} // namespace iceberg
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 62864cd6..1bd2fd6a 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -180,7 +180,8 @@ if(ICEBERG_BUILD_BUNDLE)
update_partition_spec_test.cc
update_properties_test.cc
update_schema_test.cc
- update_sort_order_test.cc)
+ update_sort_order_test.cc
+ update_statistics_test.cc)
add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc)
diff --git a/src/iceberg/test/json_internal_test.cc
b/src/iceberg/test/json_internal_test.cc
index d6a171e0..bb167ad0 100644
--- a/src/iceberg/test/json_internal_test.cc
+++ b/src/iceberg/test/json_internal_test.cc
@@ -31,6 +31,7 @@
#include "iceberg/snapshot.h"
#include "iceberg/sort_field.h"
#include "iceberg/sort_order.h"
+#include "iceberg/statistics_file.h"
#include "iceberg/table_requirement.h"
#include "iceberg/table_update.h"
#include "iceberg/test/matchers.h"
@@ -564,6 +565,54 @@ TEST(JsonInternalTest, TableUpdateSetLocation) {
EXPECT_EQ(*internal::checked_cast<table::SetLocation*>(parsed.value().get()),
update);
}
+TEST(JsonInternalTest, TableUpdateSetStatistics) {
+ auto stats_file = std::make_shared<StatisticsFile>();
+ stats_file->snapshot_id = 123456789;
+ stats_file->path =
"s3://bucket/warehouse/table/metadata/stats-123456789.puffin";
+ stats_file->file_size_in_bytes = 1024;
+ stats_file->file_footer_size_in_bytes = 128;
+ stats_file->blob_metadata = {BlobMetadata{.type = "ndv",
+ .source_snapshot_id = 123456789,
+ .source_snapshot_sequence_number =
1,
+ .fields = {1, 2},
+ .properties = {{"prop1",
"value1"}}}};
+
+ table::SetStatistics update(stats_file);
+ nlohmann::json expected = R"({
+ "action": "set-statistics",
+ "statistics": {
+ "snapshot-id": 123456789,
+ "statistics-path":
"s3://bucket/warehouse/table/metadata/stats-123456789.puffin",
+ "file-size-in-bytes": 1024,
+ "file-footer-size-in-bytes": 128,
+ "blob-metadata": [{
+ "type": "ndv",
+ "snapshot-id": 123456789,
+ "sequence-number": 1,
+ "fields": [1, 2],
+ "properties": {"prop1": "value1"}
+ }]
+ }
+ })"_json;
+
+ EXPECT_EQ(ToJson(update), expected);
+ auto parsed = TableUpdateFromJson(expected);
+ ASSERT_THAT(parsed, IsOk());
+
EXPECT_EQ(*internal::checked_cast<table::SetStatistics*>(parsed.value().get()),
update);
+}
+
+TEST(JsonInternalTest, TableUpdateRemoveStatistics) {
+ table::RemoveStatistics update(123456789);
+ nlohmann::json expected =
+ R"({"action":"remove-statistics","snapshot-id":123456789})"_json;
+
+ EXPECT_EQ(ToJson(update), expected);
+ auto parsed = TableUpdateFromJson(expected);
+ ASSERT_THAT(parsed, IsOk());
+
EXPECT_EQ(*internal::checked_cast<table::RemoveStatistics*>(parsed.value().get()),
+ update);
+}
+
TEST(JsonInternalTest, TableUpdateUnknownAction) {
nlohmann::json json = R"({"action":"unknown-action"})"_json;
auto result = TableUpdateFromJson(json);
diff --git a/src/iceberg/test/update_statistics_test.cc
b/src/iceberg/test/update_statistics_test.cc
new file mode 100644
index 00000000..7b9d4f58
--- /dev/null
+++ b/src/iceberg/test/update_statistics_test.cc
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_statistics.h"
+
+#include <algorithm>
+#include <memory>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/update_test_base.h"
+
+namespace iceberg {
+
+class UpdateStatisticsTest : public UpdateTestBase {
+ protected:
+ // Helper function to create a statistics file
+ std::shared_ptr<StatisticsFile> MakeStatisticsFile(int64_t snapshot_id,
+ const std::string& path,
+ int64_t file_size = 1024,
+ int64_t footer_size =
128) {
+ auto stats_file = std::make_shared<StatisticsFile>();
+ stats_file->snapshot_id = snapshot_id;
+ stats_file->path = path;
+ stats_file->file_size_in_bytes = file_size;
+ stats_file->file_footer_size_in_bytes = footer_size;
+
+ BlobMetadata blob;
+ blob.type = "apache-datasketches-theta-v1";
+ blob.source_snapshot_id = snapshot_id;
+ blob.source_snapshot_sequence_number = 1;
+ blob.fields = {1, 2};
+ blob.properties = {{"ndv", "100"}};
+ stats_file->blob_metadata.push_back(blob);
+
+ return stats_file;
+ }
+
+ // Helper to find statistics file by snapshot_id in the result vector
+ std::shared_ptr<StatisticsFile> FindStatistics(
+ const std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>>&
to_set,
+ int64_t snapshot_id) {
+ auto it = std::ranges::find_if(
+ to_set, [snapshot_id](const auto& p) { return p.first == snapshot_id;
});
+ return it != to_set.end() ? it->second : nullptr;
+ }
+};
+
+TEST_F(UpdateStatisticsTest, EmptyUpdate) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_TRUE(result.to_remove.empty());
+}
+
+TEST_F(UpdateStatisticsTest, SetStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ auto stats_file =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+ update->SetStatistics(stats_file);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 1);
+ EXPECT_TRUE(result.to_remove.empty());
+ EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file);
+}
+
+TEST_F(UpdateStatisticsTest, SetMultipleStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ auto stats_file_1 =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+ auto stats_file_2 =
+ MakeStatisticsFile(2, "/warehouse/test_table/metadata/stats-2.puffin");
+
+ update->SetStatistics(stats_file_1).SetStatistics(stats_file_2);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 2);
+ EXPECT_TRUE(result.to_remove.empty());
+ EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file_1);
+ EXPECT_EQ(FindStatistics(result.to_set, 2), stats_file_2);
+}
+
+TEST_F(UpdateStatisticsTest, RemoveStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ update->RemoveStatistics(1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_EQ(result.to_remove.size(), 1);
+ EXPECT_THAT(result.to_remove, ::testing::Contains(1));
+}
+
+TEST_F(UpdateStatisticsTest, RemoveMultipleStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ update->RemoveStatistics(1).RemoveStatistics(2).RemoveStatistics(3);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_EQ(result.to_remove.size(), 3);
+ EXPECT_THAT(result.to_remove, ::testing::UnorderedElementsAre(1, 2, 3));
+}
+
+TEST_F(UpdateStatisticsTest, SetAndRemoveDifferentSnapshots) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ auto stats_file =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+
+ update->SetStatistics(stats_file).RemoveStatistics(2);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 1);
+ EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file);
+ EXPECT_EQ(result.to_remove.size(), 1);
+ EXPECT_THAT(result.to_remove, ::testing::Contains(2));
+}
+
+TEST_F(UpdateStatisticsTest, ReplaceStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ auto stats_file_1 =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1a.puffin");
+ auto stats_file_2 =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1b.puffin",
2048, 256);
+
+ // Set statistics for snapshot 1, then replace it
+ update->SetStatistics(stats_file_1).SetStatistics(stats_file_2);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 1);
+ EXPECT_TRUE(result.to_remove.empty());
+ // Should have the second one (replacement)
+ EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file_2);
+ EXPECT_NE(FindStatistics(result.to_set, 1), stats_file_1);
+}
+
+TEST_F(UpdateStatisticsTest, SetThenRemoveSameSnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ auto stats_file =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+
+ // Set statistics for snapshot 1, then remove it
+ update->SetStatistics(stats_file).RemoveStatistics(1);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_TRUE(result.to_set.empty());
+ EXPECT_EQ(result.to_remove.size(), 1);
+ EXPECT_THAT(result.to_remove, ::testing::Contains(1));
+}
+
+TEST_F(UpdateStatisticsTest, RemoveThenSetSameSnapshot) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+ auto stats_file =
+ MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin");
+
+ // Remove statistics for snapshot 1, then set new ones
+ update->RemoveStatistics(1).SetStatistics(stats_file);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply());
+ EXPECT_EQ(result.to_set.size(), 1);
+ EXPECT_TRUE(result.to_remove.empty());
+ EXPECT_EQ(FindStatistics(result.to_set, 1), stats_file);
+}
+
+TEST_F(UpdateStatisticsTest, SetNullStatistics) {
+ ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics());
+
+ update->SetStatistics(nullptr);
+
+ auto result = update->Apply();
+ EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed));
+ EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null"));
+}
+
+TEST_F(UpdateStatisticsTest, CommitSuccess) {
+ // Test empty commit
+ ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateStatistics());
+ EXPECT_THAT(empty_update->Commit(), IsOk());
+
+ // Reload table after first commit
+ ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_));
+
+ // Get a valid snapshot ID from the table
+ ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot());
+ int64_t snapshot_id = current_snapshot->snapshot_id;
+
+ // Test commit with statistics changes
+ ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateStatistics());
+ auto stats_file =
+ MakeStatisticsFile(snapshot_id,
"/warehouse/test_table/metadata/stats-1.puffin");
+ update->SetStatistics(stats_file);
+
+ EXPECT_THAT(update->Commit(), IsOk());
+
+ // Verify the statistics were committed and persisted
+ ICEBERG_UNWRAP_OR_FAIL(auto final_table, catalog_->LoadTable(table_ident_));
+ const auto& statistics = final_table->metadata()->statistics;
+ EXPECT_EQ(statistics.size(), 1);
+ EXPECT_EQ(*statistics[0], *stats_file);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc
index 0f950b02..7bd4a577 100644
--- a/src/iceberg/transaction.cc
+++ b/src/iceberg/transaction.cc
@@ -25,6 +25,7 @@
#include "iceberg/catalog.h"
#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
+#include "iceberg/statistics_file.h"
#include "iceberg/table.h"
#include "iceberg/table_metadata.h"
#include "iceberg/table_properties.h"
@@ -41,6 +42,7 @@
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_sort_order.h"
+#include "iceberg/update/update_statistics.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/location_util.h"
#include "iceberg/util/macros.h"
@@ -198,6 +200,16 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->AssignUUID();
}
} break;
+ case PendingUpdate::Kind::kUpdateStatistics: {
+ auto& update_statistics =
internal::checked_cast<UpdateStatistics&>(update);
+ ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply());
+ for (auto&& [_, stat_file] : result.to_set) {
+ metadata_builder_->SetStatistics(std::move(stat_file));
+ }
+ for (const auto& snapshot_id : result.to_remove) {
+ metadata_builder_->RemoveStatistics(snapshot_id);
+ }
+ } break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
@@ -316,4 +328,11 @@ Result<std::shared_ptr<FastAppend>>
Transaction::NewFastAppend() {
return fast_append;
}
+Result<std::shared_ptr<UpdateStatistics>> Transaction::NewUpdateStatistics() {
+ ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateStatistics> update_statistics,
+ UpdateStatistics::Make(shared_from_this()));
+ ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics));
+ return update_statistics;
+}
+
} // namespace iceberg
diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h
index 23b01fb1..6d7816ee 100644
--- a/src/iceberg/transaction.h
+++ b/src/iceberg/transaction.h
@@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public
std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();
+ /// \brief Create a new UpdateStatistics to update table statistics and
commit the
+ /// changes.
+ Result<std::shared_ptr<UpdateStatistics>> NewUpdateStatistics();
+
/// \brief Create a new UpdateLocation to update the table location and
commit the
/// changes.
Result<std::shared_ptr<UpdateLocation>> NewUpdateLocation();
diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h
index 40736419..2e099c1b 100644
--- a/src/iceberg/type_fwd.h
+++ b/src/iceberg/type_fwd.h
@@ -199,6 +199,7 @@ class UpdatePartitionSpec;
class UpdateProperties;
class UpdateSchema;
class UpdateSortOrder;
+class UpdateStatistics;
///
----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build
index 2bbd1037..8dc92c00 100644
--- a/src/iceberg/update/meson.build
+++ b/src/iceberg/update/meson.build
@@ -27,6 +27,7 @@ install_headers(
'update_schema.h',
'update_sort_order.h',
'update_properties.h',
+ 'update_statistics.h',
],
subdir: 'iceberg/update',
)
diff --git a/src/iceberg/update/pending_update.h
b/src/iceberg/update/pending_update.h
index 68c2d205..0c0b6e3e 100644
--- a/src/iceberg/update/pending_update.h
+++ b/src/iceberg/update/pending_update.h
@@ -50,6 +50,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdateSchema,
kUpdateSnapshot,
kUpdateSortOrder,
+ kUpdateStatistics,
};
/// \brief Return the kind of this pending update.
diff --git a/src/iceberg/update/update_statistics.cc
b/src/iceberg/update/update_statistics.cc
new file mode 100644
index 00000000..46145336
--- /dev/null
+++ b/src/iceberg/update/update_statistics.cc
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "iceberg/update/update_statistics.h"
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include "iceberg/result.h"
+#include "iceberg/statistics_file.h"
+#include "iceberg/transaction.h"
+#include "iceberg/util/macros.h"
+
+namespace iceberg {
+
+Result<std::shared_ptr<UpdateStatistics>> UpdateStatistics::Make(
+ std::shared_ptr<Transaction> transaction) {
+ ICEBERG_PRECHECK(transaction != nullptr,
+ "Cannot create UpdateStatistics without a transaction");
+ return std::shared_ptr<UpdateStatistics>(new
UpdateStatistics(std::move(transaction)));
+}
+
+UpdateStatistics::UpdateStatistics(std::shared_ptr<Transaction> transaction)
+ : PendingUpdate(std::move(transaction)) {}
+
+UpdateStatistics::~UpdateStatistics() = default;
+
+UpdateStatistics& UpdateStatistics::SetStatistics(
+ std::shared_ptr<StatisticsFile> statistics_file) {
+ ICEBERG_BUILDER_CHECK(statistics_file != nullptr, "Statistics file cannot be
null");
+ statistics_to_set_[statistics_file->snapshot_id] =
std::move(statistics_file);
+ return *this;
+}
+
+UpdateStatistics& UpdateStatistics::RemoveStatistics(int64_t snapshot_id) {
+ statistics_to_set_[snapshot_id] = nullptr;
+ return *this;
+}
+
+Result<UpdateStatistics::ApplyResult> UpdateStatistics::Apply() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+
+ ApplyResult result;
+ for (const auto& [snapshot_id, stats] : statistics_to_set_) {
+ if (stats) {
+ result.to_set.emplace_back(snapshot_id, stats);
+ } else {
+ result.to_remove.push_back(snapshot_id);
+ }
+ }
+ return result;
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/update/update_statistics.h
b/src/iceberg/update/update_statistics.h
new file mode 100644
index 00000000..55e50fb1
--- /dev/null
+++ b/src/iceberg/update/update_statistics.h
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <cstdint>
+#include <memory>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/update/pending_update.h"
+
+/// \file iceberg/update/update_statistics.h
+/// \brief Updates table statistics.
+
+namespace iceberg {
+
+/// \brief Updates table statistics.
+class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate {
+ public:
+ static Result<std::shared_ptr<UpdateStatistics>> Make(
+ std::shared_ptr<Transaction> transaction);
+
+ ~UpdateStatistics() override;
+
+ /// \brief Set statistics file for a snapshot.
+ ///
+ /// Associates a statistics file with a snapshot ID. If statistics already
exist
+ /// for this snapshot, they will be replaced.
+ ///
+ /// \param statistics_file The statistics file to set
+ /// \return Reference to this UpdateStatistics for chaining
+ UpdateStatistics& SetStatistics(std::shared_ptr<StatisticsFile>
statistics_file);
+
+ /// \brief Remove statistics for a snapshot.
+ ///
+ /// Marks the statistics for the given snapshot ID for removal.
+ ///
+ /// \param snapshot_id The snapshot ID whose statistics to remove
+ /// \return Reference to this UpdateStatistics for chaining
+ UpdateStatistics& RemoveStatistics(int64_t snapshot_id);
+
+ Kind kind() const final { return Kind::kUpdateStatistics; }
+
+ struct ApplyResult {
+ std::vector<std::pair<int64_t, std::shared_ptr<StatisticsFile>>> to_set;
+ std::vector<int64_t> to_remove;
+ };
+
+ Result<ApplyResult> Apply();
+
+ private:
+ explicit UpdateStatistics(std::shared_ptr<Transaction> transaction);
+
+ std::unordered_map<int64_t, std::shared_ptr<StatisticsFile>>
statistics_to_set_;
+};
+
+} // namespace iceberg