This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new dba8f922 feat: add snapshot util (#420)
dba8f922 is described below
commit dba8f9223d3f71356c80334d1cdf00a5f6dcce03
Author: Junwang Zhao <[email protected]>
AuthorDate: Mon Dec 22 17:14:06 2025 +0800
feat: add snapshot util (#420)
---
src/iceberg/CMakeLists.txt | 1 +
src/iceberg/avro/avro_stream_internal.cc | 17 +-
src/iceberg/exception.h | 2 +-
src/iceberg/meson.build | 1 +
src/iceberg/table_metadata.cc | 10 +-
src/iceberg/table_metadata.h | 2 +-
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/meson.build | 1 +
src/iceberg/test/snapshot_util_test.cc | 374 ++++++++++++++++++++++++++++++
src/iceberg/type.cc | 24 +-
src/iceberg/util/decimal.cc | 4 +-
src/iceberg/util/macros.h | 17 ++
src/iceberg/util/snapshot_util.cc | 323 ++++++++++++++++++++++++++
src/iceberg/util/snapshot_util_internal.h | 276 ++++++++++++++++++++++
src/iceberg/util/timepoint.cc | 20 ++
src/iceberg/util/timepoint.h | 3 +
src/iceberg/util/uuid.cc | 2 +-
17 files changed, 1049 insertions(+), 29 deletions(-)
diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt
index 9c25015c..0579c67d 100644
--- a/src/iceberg/CMakeLists.txt
+++ b/src/iceberg/CMakeLists.txt
@@ -83,6 +83,7 @@ set(ICEBERG_SOURCES
util/decimal.cc
util/gzip_internal.cc
util/murmurhash3_internal.cc
+ util/snapshot_util.cc
util/temporal_util.cc
util/timepoint.cc
util/truncate_util.cc
diff --git a/src/iceberg/avro/avro_stream_internal.cc
b/src/iceberg/avro/avro_stream_internal.cc
index f299b523..e868bab4 100644
--- a/src/iceberg/avro/avro_stream_internal.cc
+++ b/src/iceberg/avro/avro_stream_internal.cc
@@ -66,8 +66,9 @@ bool AvroInputStream::next(const uint8_t** data, size_t* len)
{
}
void AvroInputStream::backup(size_t len) {
- ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes
available",
- len, buffer_pos_);
+ ICEBERG_CHECK_OR_DIE(len <= buffer_pos_,
+ "Cannot backup {} bytes, only {} bytes available", len,
+ buffer_pos_);
buffer_pos_ -= len;
byte_count_ -= len;
@@ -88,7 +89,8 @@ size_t AvroInputStream::byteCount() const { return
byte_count_; }
void AvroInputStream::seek(int64_t position) {
auto status = input_stream_->Seek(position);
- ICEBERG_CHECK(status.ok(), "Failed to seek to {}, got {}", position,
status.ToString());
+ ICEBERG_CHECK_OR_DIE(status.ok(), "Failed to seek to {}, got {}", position,
+ status.ToString());
buffer_pos_ = 0;
available_bytes_ = 0;
@@ -116,8 +118,9 @@ bool AvroOutputStream::next(uint8_t** data, size_t* len) {
}
void AvroOutputStream::backup(size_t len) {
- ICEBERG_CHECK(len <= buffer_pos_, "Cannot backup {} bytes, only {} bytes
available",
- len, buffer_pos_);
+ ICEBERG_CHECK_OR_DIE(len <= buffer_pos_,
+ "Cannot backup {} bytes, only {} bytes available", len,
+ buffer_pos_);
buffer_pos_ -= len;
}
@@ -126,12 +129,12 @@ uint64_t AvroOutputStream::byteCount() const { return
flushed_bytes_ + buffer_po
void AvroOutputStream::flush() {
if (buffer_pos_ > 0) {
auto status = output_stream_->Write(buffer_.data(), buffer_pos_);
- ICEBERG_CHECK(status.ok(), "Write failed {}", status.ToString());
+ ICEBERG_CHECK_OR_DIE(status.ok(), "Write failed {}", status.ToString());
flushed_bytes_ += buffer_pos_;
buffer_pos_ = 0;
}
auto status = output_stream_->Flush();
- ICEBERG_CHECK(status.ok(), "Flush failed {}", status.ToString());
+ ICEBERG_CHECK_OR_DIE(status.ok(), "Flush failed {}", status.ToString());
}
const std::shared_ptr<::arrow::io::OutputStream>&
AvroOutputStream::arrow_output_stream()
diff --git a/src/iceberg/exception.h b/src/iceberg/exception.h
index bbb9c228..0333eb12 100644
--- a/src/iceberg/exception.h
+++ b/src/iceberg/exception.h
@@ -44,7 +44,7 @@ class ICEBERG_EXPORT ExpressionError : public IcebergError {
explicit ExpressionError(const std::string& what) : IcebergError(what) {}
};
-#define ICEBERG_CHECK(condition, ...) \
+#define ICEBERG_CHECK_OR_DIE(condition, ...) \
do { \
if (!(condition)) [[unlikely]] { \
throw iceberg::IcebergError(std::format(__VA_ARGS__)); \
diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build
index 21a41dcf..850f6590 100644
--- a/src/iceberg/meson.build
+++ b/src/iceberg/meson.build
@@ -105,6 +105,7 @@ iceberg_sources = files(
'util/decimal.cc',
'util/gzip_internal.cc',
'util/murmurhash3_internal.cc',
+ 'util/snapshot_util.cc',
'util/temporal_util.cc',
'util/timepoint.cc',
'util/truncate_util.cc',
diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc
index 9bf6c530..2136494f 100644
--- a/src/iceberg/table_metadata.cc
+++ b/src/iceberg/table_metadata.cc
@@ -70,9 +70,9 @@ Result<std::shared_ptr<Schema>> TableMetadata::Schema() const
{
}
Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
- const std::optional<int32_t>& schema_id) const {
+ std::optional<int32_t> schema_id) const {
auto iter = std::ranges::find_if(schemas, [schema_id](const auto& schema) {
- return schema->schema_id() == schema_id;
+ return schema != nullptr && schema->schema_id() == schema_id;
});
if (iter == schemas.end()) {
return NotFound("Schema with ID {} is not found", schema_id.value_or(-1));
@@ -82,7 +82,7 @@ Result<std::shared_ptr<Schema>> TableMetadata::SchemaById(
Result<std::shared_ptr<PartitionSpec>> TableMetadata::PartitionSpec() const {
auto iter = std::ranges::find_if(partition_specs, [this](const auto& spec) {
- return spec->spec_id() == default_spec_id;
+ return spec != nullptr && spec->spec_id() == default_spec_id;
});
if (iter == partition_specs.end()) {
return NotFound("Default partition spec is not found");
@@ -92,7 +92,7 @@ Result<std::shared_ptr<PartitionSpec>>
TableMetadata::PartitionSpec() const {
Result<std::shared_ptr<SortOrder>> TableMetadata::SortOrder() const {
auto iter = std::ranges::find_if(sort_orders, [this](const auto& order) {
- return order->order_id() == default_sort_order_id;
+ return order != nullptr && order->order_id() == default_sort_order_id;
});
if (iter == sort_orders.end()) {
return NotFound("Default sort order is not found");
@@ -106,7 +106,7 @@ Result<std::shared_ptr<Snapshot>> TableMetadata::Snapshot()
const {
Result<std::shared_ptr<Snapshot>> TableMetadata::SnapshotById(int64_t
snapshot_id) const {
auto iter = std::ranges::find_if(snapshots, [snapshot_id](const auto&
snapshot) {
- return snapshot->snapshot_id == snapshot_id;
+ return snapshot != nullptr && snapshot->snapshot_id == snapshot_id;
});
if (iter == snapshots.end()) {
return NotFound("Snapshot with ID {} is not found", snapshot_id);
diff --git a/src/iceberg/table_metadata.h b/src/iceberg/table_metadata.h
index f7c26011..3f2f3610 100644
--- a/src/iceberg/table_metadata.h
+++ b/src/iceberg/table_metadata.h
@@ -128,7 +128,7 @@ struct ICEBERG_EXPORT TableMetadata {
Result<std::shared_ptr<iceberg::Schema>> Schema() const;
/// \brief Get the current schema by ID, return NotFoundError if not found
Result<std::shared_ptr<iceberg::Schema>> SchemaById(
- const std::optional<int32_t>& schema_id) const;
+ std::optional<int32_t> schema_id) const;
/// \brief Get the current partition spec, return NotFoundError if not found
Result<std::shared_ptr<iceberg::PartitionSpec>> PartitionSpec() const;
/// \brief Get the current sort order, return NotFoundError if not found
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index 2af7d1c4..28178b88 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -70,6 +70,7 @@ add_iceberg_test(table_test
SOURCES
metrics_config_test.cc
snapshot_test.cc
+ snapshot_util_test.cc
table_metadata_builder_test.cc
table_requirement_test.cc
table_requirements_test.cc
diff --git a/src/iceberg/test/meson.build b/src/iceberg/test/meson.build
index 5ccab940..fcd397b9 100644
--- a/src/iceberg/test/meson.build
+++ b/src/iceberg/test/meson.build
@@ -47,6 +47,7 @@ iceberg_tests = {
'sources': files(
'metrics_config_test.cc',
'snapshot_test.cc',
+ 'snapshot_util_test.cc',
'table_metadata_builder_test.cc',
'table_requirement_test.cc',
'table_requirements_test.cc',
diff --git a/src/iceberg/test/snapshot_util_test.cc
b/src/iceberg/test/snapshot_util_test.cc
new file mode 100644
index 00000000..e4e17251
--- /dev/null
+++ b/src/iceberg/test/snapshot_util_test.cc
@@ -0,0 +1,374 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <chrono>
+#include <memory>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/sort_order.h"
+#include "iceberg/table.h"
+#include "iceberg/table_identifier.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/test/mock_catalog.h"
+#include "iceberg/test/mock_io.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+namespace {
+
+// Schema for testing: id (int32), data (string)
+std::shared_ptr<Schema> CreateTestSchema() {
+ auto field1 = SchemaField::MakeRequired(1, "id", int32());
+ auto field2 = SchemaField::MakeRequired(2, "data", string());
+ return std::make_shared<Schema>(std::vector<SchemaField>{field1, field2}, 0);
+}
+
+// Helper to create a snapshot
+std::shared_ptr<Snapshot> CreateSnapshot(int64_t snapshot_id,
+ std::optional<int64_t> parent_id,
+ int64_t sequence_number,
+ TimePointMs timestamp_ms) {
+ return std::make_shared<Snapshot>(
+ Snapshot{.snapshot_id = snapshot_id,
+ .parent_snapshot_id = parent_id,
+ .sequence_number = sequence_number,
+ .timestamp_ms = timestamp_ms,
+ .manifest_list =
+ "s3://bucket/manifest-list-" + std::to_string(snapshot_id)
+ ".avro",
+ .summary = {},
+ .schema_id = 0});
+}
+
+// Helper to create table metadata with snapshots
+std::shared_ptr<TableMetadata> CreateTableMetadataWithSnapshots(
+ int64_t base_snapshot_id, int64_t main1_snapshot_id, int64_t
main2_snapshot_id,
+ int64_t branch_snapshot_id, int64_t fork0_snapshot_id, int64_t
fork1_snapshot_id,
+ int64_t fork2_snapshot_id, TimePointMs base_timestamp) {
+ auto metadata = std::make_shared<TableMetadata>();
+ metadata->format_version = 2;
+ metadata->table_uuid = "test-uuid-1234";
+ metadata->location = "s3://bucket/test";
+ metadata->last_sequence_number = 10;
+ metadata->last_updated_ms = base_timestamp + std::chrono::milliseconds(3000);
+ metadata->last_column_id = 2;
+ metadata->current_schema_id = 0;
+ metadata->schemas.push_back(CreateTestSchema());
+ metadata->default_spec_id = PartitionSpec::kInitialSpecId;
+ metadata->last_partition_id = 0;
+ metadata->current_snapshot_id = main2_snapshot_id;
+ metadata->default_sort_order_id = SortOrder::kInitialSortOrderId;
+ metadata->sort_orders.push_back(SortOrder::Unsorted());
+ metadata->next_row_id = TableMetadata::kInitialRowId;
+ metadata->properties = TableProperties::default_properties();
+
+ // Create snapshots: base -> main1 -> main2
+ auto base_snapshot = CreateSnapshot(base_snapshot_id, std::nullopt, 1,
base_timestamp);
+ auto main1_snapshot = CreateSnapshot(main1_snapshot_id, base_snapshot_id, 2,
+ base_timestamp +
std::chrono::milliseconds(1000));
+ auto main2_snapshot = CreateSnapshot(main2_snapshot_id, main1_snapshot_id, 3,
+ base_timestamp +
std::chrono::milliseconds(2000));
+
+ // Branch snapshot (from base)
+ auto branch_snapshot = CreateSnapshot(branch_snapshot_id, base_snapshot_id,
4,
+ base_timestamp +
std::chrono::milliseconds(1500));
+
+ // Fork branch snapshots: fork0 -> fork1 -> fork2 (fork0 will be expired)
+ auto fork0_snapshot = CreateSnapshot(fork0_snapshot_id, base_snapshot_id, 5,
+ base_timestamp +
std::chrono::milliseconds(500));
+ auto fork1_snapshot = CreateSnapshot(fork1_snapshot_id, fork0_snapshot_id, 6,
+ base_timestamp +
std::chrono::milliseconds(2500));
+ auto fork2_snapshot = CreateSnapshot(fork2_snapshot_id, fork1_snapshot_id, 7,
+ base_timestamp +
std::chrono::milliseconds(3000));
+
+ metadata->snapshots = {base_snapshot, main1_snapshot, main2_snapshot,
+ branch_snapshot, fork1_snapshot, fork2_snapshot};
+ // Note: fork0 is expired, so it's not in the snapshots list
+
+ // Snapshot log
+ metadata->snapshot_log = {
+ {.timestamp_ms = base_timestamp, .snapshot_id = base_snapshot_id},
+ {.timestamp_ms = base_timestamp + std::chrono::milliseconds(1000),
+ .snapshot_id = main1_snapshot_id},
+ {.timestamp_ms = base_timestamp + std::chrono::milliseconds(2000),
+ .snapshot_id = main2_snapshot_id},
+ };
+
+ // Create refs
+ std::string branch_name = "b1";
+ metadata->refs[branch_name] = std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = branch_snapshot_id, .retention =
SnapshotRef::Branch{}});
+
+ std::string fork_branch = "fork";
+ metadata->refs[fork_branch] = std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = fork2_snapshot_id, .retention =
SnapshotRef::Branch{}});
+
+ return metadata;
+}
+
+// Helper to extract snapshot IDs from a vector of snapshots
+std::vector<int64_t> ExtractSnapshotIds(
+ const std::vector<std::shared_ptr<Snapshot>>& snapshots) {
+ std::vector<int64_t> ids;
+ ids.reserve(snapshots.size());
+ for (const auto& snapshot : snapshots) {
+ ids.push_back(snapshot->snapshot_id);
+ }
+ return ids;
+}
+
+} // namespace
+
+class SnapshotUtilTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ base_timestamp_ = TimePointMs{std::chrono::milliseconds(1000000000000)};
+ base_snapshot_id_ = 100;
+ main1_snapshot_id_ = 101;
+ main2_snapshot_id_ = 102;
+ branch_snapshot_id_ = 200;
+ fork0_snapshot_id_ = 300;
+ fork1_snapshot_id_ = 301;
+ fork2_snapshot_id_ = 302;
+
+ auto metadata = CreateTableMetadataWithSnapshots(
+ base_snapshot_id_, main1_snapshot_id_, main2_snapshot_id_,
branch_snapshot_id_,
+ fork0_snapshot_id_, fork1_snapshot_id_, fork2_snapshot_id_,
base_timestamp_);
+
+ TableIdentifier table_ident{.ns = {}, .name = "test"};
+ auto io = std::make_shared<MockFileIO>();
+ auto catalog = std::make_shared<MockCatalog>();
+ table_ = std::move(Table::Make(table_ident, std::move(metadata),
+ "s3://bucket/test/metadata.json", io,
catalog)
+ .value());
+ }
+
+ TimePointMs base_timestamp_;
+ int64_t base_snapshot_id_;
+ int64_t main1_snapshot_id_;
+ int64_t main2_snapshot_id_;
+ int64_t branch_snapshot_id_;
+ int64_t fork0_snapshot_id_;
+ int64_t fork1_snapshot_id_;
+ int64_t fork2_snapshot_id_;
+ std::shared_ptr<Table> table_;
+};
+
+TEST_F(SnapshotUtilTest, IsParentAncestorOf) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result1,
+ SnapshotUtil::IsParentAncestorOf(*table_, main1_snapshot_id_,
base_snapshot_id_));
+ EXPECT_TRUE(result1);
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result2,
+ SnapshotUtil::IsParentAncestorOf(*table_, branch_snapshot_id_,
main1_snapshot_id_));
+ EXPECT_FALSE(result2);
+
+ // fork2's parent is fork1, fork1's parent is fork0 (expired)
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result3,
+ SnapshotUtil::IsParentAncestorOf(*table_, fork2_snapshot_id_,
fork0_snapshot_id_));
+ EXPECT_TRUE(result3);
+}
+
+TEST_F(SnapshotUtilTest, IsAncestorOf) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result1,
+ SnapshotUtil::IsAncestorOf(*table_, main1_snapshot_id_,
base_snapshot_id_));
+ EXPECT_TRUE(result1);
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result2,
+ SnapshotUtil::IsAncestorOf(*table_, branch_snapshot_id_,
main1_snapshot_id_));
+ EXPECT_FALSE(result2);
+
+ // fork2 -> fork1 -> fork0 (expired, not in snapshots)
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto result3,
+ SnapshotUtil::IsAncestorOf(*table_, fork2_snapshot_id_,
fork0_snapshot_id_));
+ EXPECT_FALSE(result3); // fork0 is expired, so not found
+
+ // Test with current snapshot
+ ICEBERG_UNWRAP_OR_FAIL(auto result4,
+ SnapshotUtil::IsAncestorOf(*table_,
main1_snapshot_id_));
+ EXPECT_TRUE(result4);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto result5,
+ SnapshotUtil::IsAncestorOf(*table_,
branch_snapshot_id_));
+ EXPECT_FALSE(result5);
+}
+
+TEST_F(SnapshotUtilTest, CurrentAncestors) {
+ ICEBERG_UNWRAP_OR_FAIL(auto ancestors,
SnapshotUtil::CurrentAncestors(*table_));
+ auto ids = ExtractSnapshotIds(ancestors);
+ EXPECT_EQ(ids, std::vector<int64_t>(
+ {main2_snapshot_id_, main1_snapshot_id_,
base_snapshot_id_}));
+
+ ICEBERG_UNWRAP_OR_FAIL(auto ancestor_ids,
SnapshotUtil::CurrentAncestorIds(*table_));
+ EXPECT_EQ(ancestor_ids, std::vector<int64_t>({main2_snapshot_id_,
main1_snapshot_id_,
+ base_snapshot_id_}));
+}
+
+TEST_F(SnapshotUtilTest, OldestAncestor) {
+ ICEBERG_UNWRAP_OR_FAIL(auto oldest, SnapshotUtil::OldestAncestor(*table_));
+ ASSERT_TRUE(oldest.has_value());
+ EXPECT_EQ(oldest.value()->snapshot_id, base_snapshot_id_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto oldest_of_main2,
+ SnapshotUtil::OldestAncestorOf(*table_,
main2_snapshot_id_));
+ ASSERT_TRUE(oldest_of_main2.has_value());
+ EXPECT_EQ(oldest_of_main2.value()->snapshot_id, base_snapshot_id_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto oldest_after,
+ SnapshotUtil::OldestAncestorAfter(
+ *table_, base_timestamp_ +
std::chrono::milliseconds(1)));
+ ASSERT_TRUE(oldest_after.has_value());
+ EXPECT_EQ(oldest_after.value()->snapshot_id, main1_snapshot_id_);
+}
+
+TEST_F(SnapshotUtilTest, SnapshotsBetween) {
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto snapshot_ids,
+ SnapshotUtil::SnapshotIdsBetween(*table_, base_snapshot_id_,
main2_snapshot_id_));
+ EXPECT_EQ(snapshot_ids, std::vector<int64_t>({main2_snapshot_id_,
main1_snapshot_id_}));
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto ancestors_between1,
+ SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_,
main1_snapshot_id_));
+ auto ids1 = ExtractSnapshotIds(ancestors_between1);
+ EXPECT_EQ(ids1, std::vector<int64_t>({main2_snapshot_id_}));
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto ancestors_between2,
+ SnapshotUtil::AncestorsBetween(*table_, main2_snapshot_id_,
branch_snapshot_id_));
+ auto ids2 = ExtractSnapshotIds(ancestors_between2);
+ EXPECT_EQ(ids2, std::vector<int64_t>(
+ {main2_snapshot_id_, main1_snapshot_id_,
base_snapshot_id_}));
+}
+
+TEST_F(SnapshotUtilTest, AncestorsOf) {
+ // Test ancestors of fork2: fork2 -> fork1 (fork0 is expired, not in
snapshots)
+ ICEBERG_UNWRAP_OR_FAIL(auto ancestors,
+ SnapshotUtil::AncestorsOf(*table_,
fork2_snapshot_id_));
+ auto ids = ExtractSnapshotIds(ancestors);
+ EXPECT_EQ(ids, std::vector<int64_t>({fork2_snapshot_id_,
fork1_snapshot_id_}));
+}
+
+TEST_F(SnapshotUtilTest, SchemaForRef) {
+ ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema());
+ ASSERT_NE(initial_schema, nullptr);
+
+ // Test with null/empty ref (main branch)
+ ICEBERG_UNWRAP_OR_FAIL(auto schema1, SnapshotUtil::SchemaFor(*table_, ""));
+ EXPECT_EQ(schema1->fields().size(), initial_schema->fields().size());
+
+ // Test with non-existing ref
+ ICEBERG_UNWRAP_OR_FAIL(auto schema2,
+ SnapshotUtil::SchemaFor(*table_, "non-existing-ref"));
+ EXPECT_EQ(schema2->fields().size(), initial_schema->fields().size());
+
+ // Test with main branch
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto schema3,
+ SnapshotUtil::SchemaFor(*table_, std::string(SnapshotRef::kMainBranch)));
+ EXPECT_EQ(schema3->fields().size(), initial_schema->fields().size());
+}
+
+TEST_F(SnapshotUtilTest, SchemaForBranch) {
+ ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema());
+ ASSERT_NE(initial_schema, nullptr);
+
+ std::string branch = "b1";
+ ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_,
branch));
+ // Branch should return current schema (not snapshot schema)
+ EXPECT_EQ(schema->fields().size(), initial_schema->fields().size());
+}
+
+TEST_F(SnapshotUtilTest, SchemaForTag) {
+ // Create a tag pointing to base snapshot
+ auto metadata = table_->metadata();
+ std::string tag = "tag1";
+ metadata->refs[tag] = std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = base_snapshot_id_, .retention =
SnapshotRef::Tag{}});
+
+ ICEBERG_UNWRAP_OR_FAIL(auto initial_schema, table_->schema());
+ ASSERT_NE(initial_schema, nullptr);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto schema, SnapshotUtil::SchemaFor(*table_, tag));
+ // Tag should return the schema of the snapshot it points to
+ // Since base snapshot has schema_id = 0, it should return the same schema
+ EXPECT_EQ(schema->fields().size(), initial_schema->fields().size());
+}
+
+TEST_F(SnapshotUtilTest, SnapshotAfter) {
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after,
+ SnapshotUtil::SnapshotAfter(*table_,
base_snapshot_id_));
+ EXPECT_EQ(snapshot_after->snapshot_id, main1_snapshot_id_);
+
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_after_main1,
+ SnapshotUtil::SnapshotAfter(*table_,
main1_snapshot_id_));
+ EXPECT_EQ(snapshot_after_main1->snapshot_id, main2_snapshot_id_);
+}
+
+TEST_F(SnapshotUtilTest, SnapshotIdAsOfTime) {
+ // Test with timestamp before any snapshot
+ auto early_timestamp = base_timestamp_ - std::chrono::milliseconds(1000);
+ auto snapshot_id = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_,
early_timestamp);
+ EXPECT_FALSE(snapshot_id.has_value());
+
+ // Test with timestamp at base snapshot
+ auto snapshot_id1 = SnapshotUtil::OptionalSnapshotIdAsOfTime(*table_,
base_timestamp_);
+ ASSERT_TRUE(snapshot_id1.has_value());
+ EXPECT_EQ(snapshot_id1.value(), base_snapshot_id_);
+
+ // Test with timestamp between main1 and main2
+ auto mid_timestamp = base_timestamp_ + std::chrono::milliseconds(1500);
+ ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id2,
+ SnapshotUtil::SnapshotIdAsOfTime(*table_,
mid_timestamp));
+ EXPECT_EQ(snapshot_id2, main1_snapshot_id_);
+}
+
+TEST_F(SnapshotUtilTest, LatestSnapshot) {
+ // Test main branch
+ ICEBERG_UNWRAP_OR_FAIL(
+ auto main_snapshot,
+ SnapshotUtil::LatestSnapshot(*table_,
std::string(SnapshotRef::kMainBranch)));
+ EXPECT_EQ(main_snapshot->snapshot_id, main2_snapshot_id_);
+
+ // Test branch
+ ICEBERG_UNWRAP_OR_FAIL(auto branch_snapshot,
+ SnapshotUtil::LatestSnapshot(*table_, "b1"));
+ EXPECT_EQ(branch_snapshot->snapshot_id, branch_snapshot_id_);
+
+ // Test non-existing branch
+ ICEBERG_UNWRAP_OR_FAIL(auto non_existing,
+ SnapshotUtil::LatestSnapshot(*table_,
"non-existing"));
+ // Should return current snapshot
+ EXPECT_EQ(non_existing->snapshot_id, main2_snapshot_id_);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/type.cc b/src/iceberg/type.cc
index 1cd5fb3e..44512c0d 100644
--- a/src/iceberg/type.cc
+++ b/src/iceberg/type.cc
@@ -141,9 +141,9 @@ StructType::InitFieldByLowerCaseName(const StructType&
self) {
}
ListType::ListType(SchemaField element) : element_(std::move(element)) {
- ICEBERG_CHECK(element_.name() == kElementName,
- "ListType: child field name should be '{}', was '{}'",
kElementName,
- element_.name());
+ ICEBERG_CHECK_OR_DIE(element_.name() == kElementName,
+ "ListType: child field name should be '{}', was '{}'",
+ kElementName, element_.name());
}
ListType::ListType(int32_t field_id, std::shared_ptr<Type> type, bool optional)
@@ -200,12 +200,12 @@ bool ListType::Equals(const Type& other) const {
MapType::MapType(SchemaField key, SchemaField value)
: fields_{std::move(key), std::move(value)} {
- ICEBERG_CHECK(this->key().name() == kKeyName,
- "MapType: key field name should be '{}', was '{}'", kKeyName,
- this->key().name());
- ICEBERG_CHECK(this->value().name() == kValueName,
- "MapType: value field name should be '{}', was '{}'",
kValueName,
- this->value().name());
+ ICEBERG_CHECK_OR_DIE(this->key().name() == kKeyName,
+ "MapType: key field name should be '{}', was '{}'",
kKeyName,
+ this->key().name());
+ ICEBERG_CHECK_OR_DIE(this->value().name() == kValueName,
+ "MapType: value field name should be '{}', was '{}'",
kValueName,
+ this->value().name());
}
const SchemaField& MapType::key() const { return fields_[0]; }
@@ -292,8 +292,8 @@ bool DoubleType::Equals(const Type& other) const { return
other.type_id() == kTy
DecimalType::DecimalType(int32_t precision, int32_t scale)
: precision_(precision), scale_(scale) {
- ICEBERG_CHECK(precision >= 0 && precision <= kMaxPrecision,
- "DecimalType: precision must be in [0, 38], was {}",
precision);
+ ICEBERG_CHECK_OR_DIE(precision >= 0 && precision <= kMaxPrecision,
+ "DecimalType: precision must be in [0, 38], was {}",
precision);
}
int32_t DecimalType::precision() const { return precision_; }
@@ -341,7 +341,7 @@ std::string UuidType::ToString() const { return "uuid"; }
bool UuidType::Equals(const Type& other) const { return other.type_id() ==
kTypeId; }
FixedType::FixedType(int32_t length) : length_(length) {
- ICEBERG_CHECK(length >= 0, "FixedType: length must be >= 0, was {}", length);
+ ICEBERG_CHECK_OR_DIE(length >= 0, "FixedType: length must be >= 0, was {}",
length);
}
int32_t FixedType::length() const { return length_; }
diff --git a/src/iceberg/util/decimal.cc b/src/iceberg/util/decimal.cc
index f33d9328..433d3586 100644
--- a/src/iceberg/util/decimal.cc
+++ b/src/iceberg/util/decimal.cc
@@ -300,8 +300,8 @@ bool RescaleWouldCauseDataLoss(const Decimal& value,
int32_t delta_scale,
Decimal::Decimal(std::string_view str) {
auto result = Decimal::FromString(str);
- ICEBERG_CHECK(result, "Failed to parse Decimal from string: {}, error: {}",
str,
- result.error().message);
+ ICEBERG_CHECK_OR_DIE(result, "Failed to parse Decimal from string: {},
error: {}", str,
+ result.error().message);
*this = std::move(result.value());
}
diff --git a/src/iceberg/util/macros.h b/src/iceberg/util/macros.h
index 50ac13f2..c6919ab7 100644
--- a/src/iceberg/util/macros.h
+++ b/src/iceberg/util/macros.h
@@ -42,8 +42,25 @@
ICEBERG_ASSIGN_OR_RAISE_IMPL(ICEBERG_ASSIGN_OR_RAISE_NAME(result_,
__COUNTER__), lhs, \
rexpr)
+// Macro for debug checks
#define ICEBERG_DCHECK(expr, message) assert((expr) && (message))
+// Macro for precondition checks, usually used for function arguments
+#define ICEBERG_PRECHECK(expr, ...) \
+ do { \
+ if (!(expr)) [[unlikely]] { \
+ return InvalidArgument(__VA_ARGS__); \
+ } \
+ } while (0)
+
+// Macro for state checks, usually used for unexpected states
+#define ICEBERG_CHECK(expr, ...) \
+ do { \
+ if (!(expr)) [[unlikely]] { \
+ return Invalid(__VA_ARGS__); \
+ } \
+ } while (0)
+
#define ERROR_TO_EXCEPTION(error) \
if (error.kind == iceberg::ErrorKind::kInvalidExpression) { \
throw iceberg::ExpressionError(error.message); \
diff --git a/src/iceberg/util/snapshot_util.cc
b/src/iceberg/util/snapshot_util.cc
new file mode 100644
index 00000000..1243a109
--- /dev/null
+++ b/src/iceberg/util/snapshot_util.cc
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <ranges>
+
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+// Shorthand to return for a NotFound error.
+#define ICEBERG_ACTION_FOR_NOT_FOUND(result, action) \
+ if (!result.has_value()) [[unlikely]] { \
+ if (result.error().kind == ErrorKind::kNotFound) { \
+ action; \
+ } \
+ return std::unexpected<Error>(result.error()); \
+ }
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+ const Table& table, int64_t snapshot_id) {
+ return table.SnapshotById(snapshot_id).and_then([&table](const auto&
snapshot) {
+ return AncestorsOf(table, snapshot);
+ });
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table, int64_t
snapshot_id,
+ int64_t ancestor_snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+ return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto&
snapshot) {
+ return snapshot != nullptr && snapshot->snapshot_id ==
ancestor_snapshot_id;
+ });
+}
+
+Result<bool> SnapshotUtil::IsAncestorOf(const Table& table,
+ int64_t ancestor_snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot());
+ ICEBERG_CHECK(current != nullptr, "Current snapshot is null");
+ return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id);
+}
+
+Result<bool> SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t
snapshot_id,
+ int64_t
ancestor_parent_snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+ return std::ranges::any_of(
+ ancestors, [ancestor_parent_snapshot_id](const auto& snapshot) {
+ return snapshot != nullptr && snapshot->parent_snapshot_id.has_value()
&&
+ snapshot->parent_snapshot_id.value() ==
ancestor_parent_snapshot_id;
+ });
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::CurrentAncestors(
+ const Table& table) {
+ auto current_result = table.current_snapshot();
+ ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {});
+ return AncestorsOf(table, current_result.value());
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::CurrentAncestorIds(const Table&
table) {
+ return CurrentAncestors(table).and_then(ToIds);
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>> SnapshotUtil::OldestAncestor(
+ const Table& table) {
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+ if (ancestors.empty()) {
+ return std::nullopt;
+ }
+ return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>>
SnapshotUtil::OldestAncestorOf(
+ const Table& table, int64_t snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id));
+ if (ancestors.empty()) {
+ return std::nullopt;
+ }
+ return ancestors.back();
+}
+
+Result<std::optional<std::shared_ptr<Snapshot>>>
SnapshotUtil::OldestAncestorAfter(
+ const Table& table, TimePointMs timestamp_ms) {
+ auto current_result = table.current_snapshot();
+ ICEBERG_ACTION_FOR_NOT_FOUND(current_result, { return std::nullopt; });
+ auto current = std::move(current_result.value());
+
+ std::optional<std::shared_ptr<Snapshot>> last_snapshot = std::nullopt;
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current));
+ for (const auto& snapshot : ancestors) {
+ auto snapshot_timestamp_ms = snapshot->timestamp_ms;
+ if (snapshot_timestamp_ms < timestamp_ms) {
+ return last_snapshot;
+ } else if (snapshot_timestamp_ms == timestamp_ms) {
+ return snapshot;
+ }
+ last_snapshot = std::move(snapshot);
+ }
+
+ if (last_snapshot.has_value() && last_snapshot.value() != nullptr &&
+ !last_snapshot.value()->parent_snapshot_id.has_value()) {
+ // this is the first snapshot in the table, return it
+ return last_snapshot;
+ }
+
+ // the first ancestor after the given time can't be determined
+ return NotFound("Cannot find snapshot older than {}",
FormatTimestamp(timestamp_ms));
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::SnapshotIdsBetween(const Table&
table,
+ int64_t
from_snapshot_id,
+ int64_t
to_snapshot_id) {
+ // Create a lookup function that returns null when snapshot_id equals
from_snapshot_id.
+ // This effectively stops traversal at from_snapshot_id (exclusive)
+ auto lookup = [&table,
+ from_snapshot_id](int64_t id) ->
Result<std::shared_ptr<Snapshot>> {
+ if (id == from_snapshot_id) {
+ return nullptr;
+ }
+ return table.SnapshotById(id);
+ };
+
+ return table.SnapshotById(to_snapshot_id)
+ .and_then(
+ [&lookup](const auto& to_snapshot) { return AncestorsOf(to_snapshot,
lookup); })
+ .and_then(ToIds);
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::AncestorIdsBetween(
+ const Table& table, int64_t latest_snapshot_id,
+ const std::optional<int64_t>& oldest_snapshot_id) {
+ return AncestorsBetween(table, latest_snapshot_id,
oldest_snapshot_id).and_then(ToIds);
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsBetween(
+ const Table& table, int64_t latest_snapshot_id,
+ std::optional<int64_t> oldest_snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto start, table.SnapshotById(latest_snapshot_id));
+
+ if (oldest_snapshot_id.has_value()) {
+ if (latest_snapshot_id == oldest_snapshot_id.value()) {
+ return {};
+ }
+
+ return AncestorsOf(start,
+ [&table, oldest_snapshot_id =
oldest_snapshot_id.value()](
+ int64_t id) -> Result<std::shared_ptr<Snapshot>> {
+ if (id == oldest_snapshot_id) {
+ return nullptr;
+ }
+ return table.SnapshotById(id);
+ });
+ } else {
+ return AncestorsOf(table, start);
+ }
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+ const Table& table, const std::shared_ptr<Snapshot>& snapshot) {
+ return AncestorsOf(snapshot, [&table](int64_t id) { return
table.SnapshotById(id); });
+}
+
+Result<std::vector<std::shared_ptr<Snapshot>>> SnapshotUtil::AncestorsOf(
+ const std::shared_ptr<Snapshot>& snapshot,
+ const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup) {
+ ICEBERG_PRECHECK(snapshot != nullptr, "Snapshot is null");
+
+ std::shared_ptr<Snapshot> current = snapshot;
+ std::vector<std::shared_ptr<Snapshot>> result;
+
+ while (current != nullptr) {
+ result.push_back(current);
+ if (!current->parent_snapshot_id.has_value()) {
+ break;
+ }
+ auto parent_result = lookup(current->parent_snapshot_id.value());
+ ICEBERG_ACTION_FOR_NOT_FOUND(parent_result, { break; });
+ current = std::move(parent_result.value());
+ }
+
+ return result;
+}
+
+Result<std::vector<int64_t>> SnapshotUtil::ToIds(
+ const std::vector<std::shared_ptr<Snapshot>>& snapshots) {
+ return snapshots |
+ std::views::filter([](const auto& snapshot) { return snapshot !=
nullptr; }) |
+ std::views::transform(
+ [](const auto& snapshot) { return snapshot->snapshot_id; }) |
+ std::ranges::to<std::vector<int64_t>>();
+}
+
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::SnapshotAfter(const Table&
table,
+ int64_t
snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto parent, table.SnapshotById(snapshot_id));
+ ICEBERG_CHECK(parent != nullptr, "Snapshot is null for id {}", snapshot_id);
+
+ ICEBERG_ASSIGN_OR_RAISE(auto ancestors, CurrentAncestors(table));
+ for (const auto& current : ancestors) {
+ if (current != nullptr && current->parent_snapshot_id.has_value() &&
+ current->parent_snapshot_id.value() == snapshot_id) {
+ return current;
+ }
+ }
+
+ return NotFound(
+ "Cannot find snapshot after {}: not an ancestor of table's current
snapshot",
+ snapshot_id);
+}
+
+Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const Table& table,
+ TimePointMs timestamp_ms) {
+ auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms);
+ ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than
{}",
+ FormatTimestamp(timestamp_ms));
+ return snapshot_id.value();
+}
+
+std::optional<int64_t> SnapshotUtil::OptionalSnapshotIdAsOfTime(
+ const Table& table, TimePointMs timestamp_ms) {
+ std::optional<int64_t> snapshot_id = std::nullopt;
+ for (const auto& log_entry : table.history()) {
+ if (log_entry.timestamp_ms <= timestamp_ms) {
+ snapshot_id = log_entry.snapshot_id;
+ }
+ }
+ return snapshot_id;
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
+ int64_t snapshot_id) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot, table.SnapshotById(snapshot_id));
+ ICEBERG_CHECK(snapshot, "Snapshot is null for id {}", snapshot_id);
+
+ if (snapshot->schema_id.has_value()) {
+ return table.metadata()->SchemaById(snapshot->schema_id.value());
+ }
+
+ // TODO(any): recover the schema by reading previous metadata files
+ return table.schema();
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
+ TimePointMs
timestamp_ms) {
+ return SnapshotIdAsOfTime(table, timestamp_ms).and_then([&table](int64_t id)
{
+ return SchemaFor(table, id);
+ });
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
+ const std::string&
ref) {
+ if (ref.empty() || ref == SnapshotRef::kMainBranch) {
+ return table.schema();
+ }
+
+ const auto& metadata = table.metadata();
+ auto it = metadata->refs.find(ref);
+ if (it == metadata->refs.cend() || it->second->type() ==
SnapshotRefType::kBranch) {
+ return table.schema();
+ }
+
+ return SchemaFor(table, it->second->snapshot_id);
+}
+
+Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const TableMetadata&
metadata,
+ const std::string&
ref) {
+ if (ref.empty() || ref == SnapshotRef::kMainBranch) {
+ return metadata.Schema();
+ }
+
+ auto it = metadata.refs.find(ref);
+ if (it == metadata.refs.end() || it->second->type() ==
SnapshotRefType::kBranch) {
+ return metadata.Schema();
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
metadata.SnapshotById(it->second->snapshot_id));
+ if (!snapshot->schema_id.has_value()) {
+ return metadata.Schema();
+ }
+
+ return metadata.SchemaById(snapshot->schema_id);
+}
+
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::LatestSnapshot(
+ const Table& table, const std::string& branch) {
+ return LatestSnapshot(*table.metadata(), branch);
+}
+
+Result<std::shared_ptr<Snapshot>> SnapshotUtil::LatestSnapshot(
+ const TableMetadata& metadata, const std::string& branch) {
+ if (branch.empty() || branch == SnapshotRef::kMainBranch) {
+ return metadata.Snapshot();
+ }
+
+ auto it = metadata.refs.find(branch);
+ if (it == metadata.refs.end()) {
+ return metadata.Snapshot();
+ }
+
+ return metadata.SnapshotById(it->second->snapshot_id);
+}
+
+} // namespace iceberg
diff --git a/src/iceberg/util/snapshot_util_internal.h
b/src/iceberg/util/snapshot_util_internal.h
new file mode 100644
index 00000000..e0d8830f
--- /dev/null
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <functional>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include "iceberg/iceberg_export.h"
+#include "iceberg/result.h"
+#include "iceberg/type_fwd.h"
+#include "iceberg/util/timepoint.h"
+
+namespace iceberg {
+
+/// \brief Utility functions for working with snapshots
+/// \note All the returned std::shared_ptr<Snapshot> are guaranteed to be not
null.
+class ICEBERG_EXPORT SnapshotUtil {
+ public:
+ /// \brief Returns a vector of ancestors of the given snapshot.
+ ///
+ /// \param table The table
+ /// \param snapshot_id The snapshot ID to start from
+ /// \return A vector of ancestor snapshots
+ static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(const
Table& table,
+ int64_t
snapshot_id);
+
+ /// \brief Returns whether ancestor_snapshot_id is an ancestor of
snapshot_id.
+ ///
+ /// \param table The table to check
+ /// \param snapshot_id The snapshot ID to check
+ /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+ /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id
+ static Result<bool> IsAncestorOf(const Table& table, int64_t snapshot_id,
+ int64_t ancestor_snapshot_id);
+
+ /// \brief Returns whether ancestor_snapshot_id is an ancestor of the
table's current
+ /// state.
+ ///
+ /// \param table The table to check
+ /// \param ancestor_snapshot_id The ancestor snapshot ID to check for
+ /// \return true if ancestor_snapshot_id is an ancestor of the current
snapshot
+ static Result<bool> IsAncestorOf(const Table& table, int64_t
ancestor_snapshot_id);
+
+ /// \brief Returns whether some ancestor of snapshot_id has parentId matches
+ /// ancestor_parent_snapshot_id.
+ ///
+ /// \param table The table to check
+ /// \param snapshot_id The snapshot ID to check
+ /// \param ancestor_parent_snapshot_id The ancestor parent snapshot ID to
check for
+ /// \return true if any ancestor has the given parent ID
+ static Result<bool> IsParentAncestorOf(const Table& table, int64_t
snapshot_id,
+ int64_t ancestor_parent_snapshot_id);
+
+ /// \brief Returns a vector that traverses the table's snapshots from the
current to the
+ /// last known ancestor.
+ ///
+ /// \param table The table
+ /// \return A vector from the table's current snapshot to its last known
ancestor
+ static Result<std::vector<std::shared_ptr<Snapshot>>> CurrentAncestors(
+ const Table& table);
+
+ /// \brief Returns the snapshot IDs for the ancestors of the current table
state.
+ ///
+ /// Ancestor IDs are ordered by commit time, descending. The first ID is the
current
+ /// snapshot, followed by its parent, and so on.
+ ///
+ /// \param table The table
+ /// \return A vector of snapshot IDs of the known ancestor snapshots,
including the
+ /// current ID
+ static Result<std::vector<int64_t>> CurrentAncestorIds(const Table& table);
+
+ /// \brief Traverses the history of the table's current snapshot and finds
the oldest
+ /// Snapshot.
+ ///
+ /// \param table The table
+ /// \return The oldest snapshot, or nullopt if there is no current snapshot
+ static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestor(
+ const Table& table);
+
+ /// \brief Traverses the history and finds the oldest ancestor of the
specified
+ /// snapshot.
+ ///
+ /// Oldest ancestor is defined as the ancestor snapshot whose parent is null
or has been
+ /// expired. If the specified snapshot has no parent or parent has been
expired, the
+ /// specified snapshot itself is returned.
+ ///
+ /// \param table The table
+ /// \param snapshot_id The ID of the snapshot to find the oldest ancestor
+ /// \return The oldest snapshot, or nullopt if not found
+ static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestorOf(
+ const Table& table, int64_t snapshot_id);
+
+ /// \brief Traverses the history of the table's current snapshot, finds the
oldest
+ /// snapshot that was committed either at or after a given time.
+ ///
+ /// \param table The table
+ /// \param timestamp_ms A timestamp in milliseconds
+ /// \return The first snapshot after the given timestamp, or nullopt if the
current
+ /// snapshot is older than the timestamp. If the first ancestor after the
given time
+ /// can't be determined, returns a NotFound error.
+ static Result<std::optional<std::shared_ptr<Snapshot>>> OldestAncestorAfter(
+ const Table& table, TimePointMs timestamp_ms);
+
+ /// \brief Returns list of snapshot ids in the range
(from_snapshot_id,to_snapshot_id]
+ ///
+ /// This method assumes that from_snapshot_id is an ancestor of
to_snapshot_id.
+ ///
+ /// \param table The table
+ /// \param from_snapshot_id The starting snapshot ID (exclusive)
+ /// \param to_snapshot_id The ending snapshot ID (inclusive)
+ /// \return A vector of snapshot IDs in the range
+ static Result<std::vector<int64_t>> SnapshotIdsBetween(const Table& table,
+ int64_t
from_snapshot_id,
+ int64_t
to_snapshot_id);
+
+ /// \brief Returns a vector of ancestor IDs between two snapshots.
+ ///
+ /// \param table The table
+ /// \param latest_snapshot_id The latest snapshot ID
+ /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt
means all
+ /// ancestors)
+ /// \return A vector of snapshot IDs between the two snapshots
+ static Result<std::vector<int64_t>> AncestorIdsBetween(
+ const Table& table, int64_t latest_snapshot_id,
+ const std::optional<int64_t>& oldest_snapshot_id);
+
+ /// \brief Returns a vector of ancestors between two snapshots.
+ ///
+ /// \param table The table
+ /// \param latest_snapshot_id The latest snapshot ID
+ /// \param oldest_snapshot_id The oldest snapshot ID (optional, nullopt
means all
+ /// ancestors)
+ /// \return A vector of ancestor snapshots between the two snapshots
+ static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsBetween(
+ const Table& table, int64_t latest_snapshot_id,
+ std::optional<int64_t> oldest_snapshot_id);
+
+ /// \brief Traverses the history of the table's current snapshot and finds
the snapshot
+ /// with the given snapshot id as its parent.
+ ///
+ /// \param table The table
+ /// \param snapshot_id The parent snapshot ID
+ /// \return The snapshot for which the given snapshot is the parent
+ static Result<std::shared_ptr<Snapshot>> SnapshotAfter(const Table& table,
+ int64_t snapshot_id);
+
+ /// \brief Returns the ID of the most recent snapshot for the table as of
the timestamp.
+ ///
+ /// \param table The table
+ /// \param timestamp_ms The timestamp in millis since the Unix epoch
+ /// \return The snapshot ID
+ static Result<int64_t> SnapshotIdAsOfTime(const Table& table, TimePointMs
timestamp_ms);
+
+ /// \brief Returns the ID of the most recent snapshot for the table as of
the timestamp,
+ /// or nullopt if not found.
+ ///
+ /// \param table The table
+ /// \param timestamp_ms The timestamp in millis since the Unix epoch
+ /// \return The snapshot ID, or nullopt if not found
+ static std::optional<int64_t> OptionalSnapshotIdAsOfTime(const Table& table,
+ TimePointMs
timestamp_ms);
+
+ /// \brief Returns the schema of the table for the specified snapshot.
+ ///
+ /// \param table The table
+ /// \param snapshot_id The ID of the snapshot
+ /// \return The schema
+ static Result<std::shared_ptr<Schema>> SchemaFor(const Table& table,
+ int64_t snapshot_id);
+
+ /// \brief Returns the schema of the table for the specified timestamp.
+ ///
+ /// \param table The table
+ /// \param timestamp_ms The timestamp in millis since the Unix epoch
+ /// \return The schema
+ static Result<std::shared_ptr<Schema>> SchemaFor(const Table& table,
+ TimePointMs timestamp_ms);
+
+ /// \brief Return the schema of the snapshot at a given ref.
+ ///
+ /// If the ref does not exist or the ref is a branch, the table schema is
returned
+ /// because it will be the schema when the new branch is created. If the ref
is a tag,
+ /// then the snapshot schema is returned.
+ ///
+ /// \param table The table
+ /// \param ref Ref name of the table (empty string means main branch)
+ /// \return Schema of the specific snapshot at the given ref
+ static Result<std::shared_ptr<Schema>> SchemaFor(const Table& table,
+ const std::string& ref);
+
+ /// \brief Return the schema of the snapshot at a given ref.
+ ///
+ /// If the ref does not exist or the ref is a branch, the table schema is
returned
+ /// because it will be the schema when the new branch is created. If the ref
is a tag,
+ /// then the snapshot schema is returned.
+ ///
+ /// \param metadata The table metadata
+ /// \param ref Ref name of the table (empty string means main branch)
+ /// \return Schema of the specific snapshot at the given branch
+ static Result<std::shared_ptr<Schema>> SchemaFor(const TableMetadata&
metadata,
+ const std::string& ref);
+
+ /// \brief Fetch the snapshot at the head of the given branch in the given
table.
+ ///
+ /// This method calls Table::current_snapshot() instead of using branch API
for the main
+ /// branch so that existing code still goes through the old code path to
ensure
+ /// backwards compatibility.
+ ///
+ /// \param table The table
+ /// \param branch Branch name of the table (empty string means main branch)
+ /// \return The latest snapshot for the given branch
+ static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const Table& table,
+ const std::string&
branch);
+
+ /// \brief Fetch the snapshot at the head of the given branch in the given
table.
+ ///
+ /// This method calls TableMetadata::Snapshot() instead of using branch API
for the main
+ /// branch so that existing code still goes through the old code path to
ensure
+ /// backwards compatibility.
+ ///
+ /// If branch does not exist, the table's latest snapshot is returned it
will be the
+ /// schema when the new branch is created.
+ ///
+ /// \param metadata The table metadata
+ /// \param branch Branch name of the table metadata (empty string means main
+ /// branch)
+ /// \return The latest snapshot for the given branch
+ static Result<std::shared_ptr<Snapshot>> LatestSnapshot(const TableMetadata&
metadata,
+ const std::string&
branch);
+
+ private:
+ /// \brief Helper function to traverse ancestors of a snapshot.
+ ///
+ /// \param table The table
+ /// \param snapshot The snapshot to start from
+ /// \return A vector of ancestor snapshots
+ static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
+ const Table& table, const std::shared_ptr<Snapshot>& snapshot);
+
+ /// \brief Helper function to traverse ancestors of a snapshot using a
lookup function.
+ ///
+ /// \param snapshot The snapshot to start from
+ /// \param lookup Function to lookup snapshots by ID
+ /// \return A vector of ancestor snapshots
+ static Result<std::vector<std::shared_ptr<Snapshot>>> AncestorsOf(
+ const std::shared_ptr<Snapshot>& snapshot,
+ const std::function<Result<std::shared_ptr<Snapshot>>(int64_t)>& lookup);
+
+ /// \brief Helper function to convert snapshots to IDs.
+ ///
+ /// \param snapshots The snapshots
+ /// \return A vector of snapshot IDs
+ static Result<std::vector<int64_t>> ToIds(
+ const std::vector<std::shared_ptr<Snapshot>>& snapshots);
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/util/timepoint.cc b/src/iceberg/util/timepoint.cc
index 6438e8e9..a8bc7708 100644
--- a/src/iceberg/util/timepoint.cc
+++ b/src/iceberg/util/timepoint.cc
@@ -20,6 +20,8 @@
#include "iceberg/util/timepoint.h"
#include <chrono>
+#include <iomanip>
+#include <sstream>
namespace iceberg {
@@ -43,4 +45,22 @@ int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns) {
.count();
}
+std::string FormatTimestamp(TimePointMs time_point_ns) {
+ // Convert TimePointMs to system_clock::time_point
+ auto unix_ms = UnixMsFromTimePointMs(time_point_ns);
+ auto time_point =
+
std::chrono::system_clock::time_point(std::chrono::milliseconds(unix_ms));
+ auto time_t = std::chrono::system_clock::to_time_t(time_point);
+
+ // Format as ISO 8601-like string: YYYY-MM-DD HH:MM:SS
+ std::ostringstream oss;
+ oss << std::put_time(std::gmtime(&time_t), "%Y-%m-%d %H:%M:%S");
+
+ // Add milliseconds
+ auto ms = unix_ms % 1000;
+ oss << "." << std::setfill('0') << std::setw(3) << ms << " UTC";
+
+ return oss.str();
+}
+
} // namespace iceberg
diff --git a/src/iceberg/util/timepoint.h b/src/iceberg/util/timepoint.h
index 53857875..48e630ae 100644
--- a/src/iceberg/util/timepoint.h
+++ b/src/iceberg/util/timepoint.h
@@ -46,4 +46,7 @@ ICEBERG_EXPORT Result<TimePointNs>
TimePointNsFromUnixNs(int64_t unix_ns);
/// \brief Returns a Unix timestamp in nanoseconds from a TimePointNs
ICEBERG_EXPORT int64_t UnixNsFromTimePointNs(TimePointNs time_point_ns);
+/// \brief Returns a human-readable string representation of a TimePointMs
+ICEBERG_EXPORT std::string FormatTimestamp(TimePointMs time_point_ns);
+
} // namespace iceberg
diff --git a/src/iceberg/util/uuid.cc b/src/iceberg/util/uuid.cc
index 14256755..9322deb9 100644
--- a/src/iceberg/util/uuid.cc
+++ b/src/iceberg/util/uuid.cc
@@ -204,7 +204,7 @@ Result<Uuid> Uuid::FromBytes(std::span<const uint8_t>
bytes) {
}
uint8_t Uuid::operator[](size_t index) const {
- ICEBERG_CHECK(index < kLength, "UUID index out of range: {}", index);
+ ICEBERG_CHECK_OR_DIE(index < kLength, "UUID index out of range: {}", index);
return data_[index];
}