This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new cdf05d65 feat: implement IncrementalAppendScan (#590)
cdf05d65 is described below
commit cdf05d6586a322410789717d7c5c81b45c18ae78
Author: wzhuo <[email protected]>
AuthorDate: Fri Mar 27 10:31:59 2026 +0800
feat: implement IncrementalAppendScan (#590)
---
src/iceberg/manifest/manifest_list.h | 13 +-
src/iceberg/table_scan.cc | 223 ++++++++-
src/iceberg/table_scan.h | 18 +-
src/iceberg/test/CMakeLists.txt | 1 +
src/iceberg/test/incremental_append_scan_test.cc | 601 +++++++++++++++++++++++
src/iceberg/test/scan_test_base.h | 275 +++++++++++
src/iceberg/test/table_scan_test.cc | 131 +----
7 files changed, 1118 insertions(+), 144 deletions(-)
diff --git a/src/iceberg/manifest/manifest_list.h
b/src/iceberg/manifest/manifest_list.h
index 2f3185a1..65271368 100644
--- a/src/iceberg/manifest/manifest_list.h
+++ b/src/iceberg/manifest/manifest_list.h
@@ -230,7 +230,9 @@ struct ICEBERG_EXPORT ManifestFile {
kFirstRowIdFieldId, "first_row_id", int64(),
"Starting row ID to assign to new rows in ADDED data files");
- bool operator==(const ManifestFile& other) const = default;
+ bool operator==(const ManifestFile& other) const {
+ return manifest_path == other.manifest_path;
+ }
static const std::shared_ptr<StructType>& Type();
};
@@ -272,3 +274,12 @@ ICEBERG_EXPORT inline constexpr Result<ManifestContent>
ManifestContentFromStrin
}
} // namespace iceberg
+
+namespace std {
+template <>
+struct hash<iceberg::ManifestFile> {
+ size_t operator()(const iceberg::ManifestFile& manifest_file) const {
+ return std::hash<std::string>{}(manifest_file.manifest_path);
+ }
+};
+} // namespace std
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 937af94b..ed2ede70 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -171,6 +171,90 @@ Status TableScanContext::Validate() const {
return {};
}
+bool IsScanCurrentLineage(const TableScanContext& context) {
+ return !context.from_snapshot_id.has_value() &&
!context.to_snapshot_id.has_value();
+}
+
+Result<int64_t> ToSnapshotIdInclusive(const TableScanContext& context,
+ const TableMetadata& metadata) {
+ // Get the branch's current snapshot ID if branch is set
+ std::shared_ptr<Snapshot> branch_snapshot;
+ const std::string& branch = context.branch;
+ if (!branch.empty()) {
+ auto iter = metadata.refs.find(branch);
+ ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr,
+ "Cannot find branch: {}", branch);
+ ICEBERG_ASSIGN_OR_RAISE(branch_snapshot,
+ metadata.SnapshotById(iter->second->snapshot_id));
+ }
+
+ if (context.to_snapshot_id.has_value()) {
+ int64_t to_snapshot_id_value = context.to_snapshot_id.value();
+
+ if (branch_snapshot != nullptr) {
+ // Validate `to_snapshot_id` is on the current branch
+ ICEBERG_ASSIGN_OR_RAISE(
+ bool is_ancestor,
+ SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id,
+ to_snapshot_id_value));
+ ICEBERG_CHECK(is_ancestor,
+ "End snapshot is not a valid snapshot on the current
branch: {}",
+ branch);
+ }
+
+ return to_snapshot_id_value;
+ }
+
+ // If to_snapshot_id is not set, use branch's current snapshot if branch is
set
+ if (branch_snapshot != nullptr) {
+ return branch_snapshot->snapshot_id;
+ }
+
+ // Get current snapshot from table's current snapshot
+ std::shared_ptr<Snapshot> current_snapshot;
+ ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot());
+ ICEBERG_CHECK(current_snapshot != nullptr,
+ "End snapshot is not set and table has no current snapshot");
+ return current_snapshot->snapshot_id;
+}
+
+Result<std::optional<int64_t>> FromSnapshotIdExclusive(const TableScanContext&
context,
+ const TableMetadata&
metadata,
+ int64_t
to_snapshot_id_inclusive) {
+ if (!context.from_snapshot_id.has_value()) {
+ return std::nullopt;
+ }
+
+ int64_t from_snapshot_id = context.from_snapshot_id.value();
+
+ // Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
+ if (context.from_snapshot_id_inclusive) {
+ ICEBERG_ASSIGN_OR_RAISE(
+ bool is_ancestor,
+ SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive,
from_snapshot_id));
+ ICEBERG_CHECK(
+ is_ancestor,
+ "Starting snapshot (inclusive) {} is not an ancestor of end snapshot
{}",
+ from_snapshot_id, to_snapshot_id_inclusive);
+
+ // For inclusive behavior, return the parent snapshot ID (can be nullopt)
+ ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot,
metadata.SnapshotById(from_snapshot_id));
+ return from_snapshot->parent_snapshot_id;
+ }
+
+ // Validate there is an ancestor of `to_snapshot_id_inclusive` where parent
is
+ // `from_snapshot_id`
+ ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor,
+ SnapshotUtil::IsParentAncestorOf(
+ metadata, to_snapshot_id_inclusive,
from_snapshot_id));
+ ICEBERG_CHECK(
+ is_parent_ancestor,
+ "Starting snapshot (exclusive) {} is not a parent ancestor of end
snapshot {}",
+ from_snapshot_id, to_snapshot_id_inclusive);
+
+ return from_snapshot_id;
+}
+
} // namespace internal
ScanTask::~ScanTask() = default;
@@ -340,10 +424,15 @@ TableScanBuilder<ScanType>&
TableScanBuilder<ScanType>::AsOfTime(
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
- [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
+ int64_t from_snapshot_id, bool inclusive)
requires IsIncrementalScan<ScanType>
{
- AddError(NotImplemented("Incremental scan is not implemented"));
+ if (inclusive) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
+
metadata_->SnapshotById(from_snapshot_id));
+ }
+ this->context_.from_snapshot_id = from_snapshot_id;
+ this->context_.from_snapshot_id_inclusive = inclusive;
return *this;
}
@@ -352,15 +441,20 @@ TableScanBuilder<ScanType>&
TableScanBuilder<ScanType>::FromSnapshot(
const std::string& ref, bool inclusive)
requires IsIncrementalScan<ScanType>
{
- AddError(NotImplemented("Incremental scan is not implemented"));
- return *this;
+ auto iter = metadata_->refs.find(ref);
+ ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}",
ref);
+ ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+ ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
+ "Ref {} is not a tag", ref);
+ return FromSnapshot(iter->second->snapshot_id, inclusive);
}
template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t
to_snapshot_id)
requires IsIncrementalScan<ScanType>
{
- AddError(NotImplemented("Incremental scan is not implemented"));
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(to_snapshot_id));
+ context_.to_snapshot_id = to_snapshot_id;
return *this;
}
@@ -368,8 +462,12 @@ template <typename ScanType>
TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const
std::string& ref)
requires IsIncrementalScan<ScanType>
{
- AddError(NotImplemented("Incremental scan is not implemented"));
- return *this;
+ auto iter = metadata_->refs.find(ref);
+ ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}",
ref);
+ ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+ ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
+ "Ref {} is not a tag", ref);
+ return ToSnapshot(iter->second->snapshot_id);
}
template <typename ScanType>
@@ -377,6 +475,11 @@ TableScanBuilder<ScanType>&
TableScanBuilder<ScanType>::UseBranch(
const std::string& branch)
requires IsIncrementalScan<ScanType>
{
+ auto iter = metadata_->refs.find(branch);
+ ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}",
branch);
+ ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch);
+ ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
+ "Ref {} is not a branch", branch);
context_.branch = branch;
return *this;
}
@@ -536,20 +639,109 @@ Result<std::vector<std::shared_ptr<FileScanTask>>>
DataTableScan::PlanFiles() co
return manifest_group->PlanFiles();
}
+// Friend function template for IncrementalScan that implements the shared
PlanFiles
+// logic. It resolves the from/to snapshot range from the scan context and
delegates
+// to the two-arg virtual PlanFiles() override in the concrete subclass.
+// Defined as a friend to access the protected two-arg PlanFiles().
+template <typename ScanTaskType>
+Result<std::vector<std::shared_ptr<ScanTaskType>>> ResolvePlanFiles(
+ const IncrementalScan<ScanTaskType>& scan) {
+ if (IsScanCurrentLineage(scan.context())) {
+ if (scan.metadata()->current_snapshot_id == kInvalidSnapshotId) {
+ return std::vector<std::shared_ptr<ScanTaskType>>{};
+ }
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ int64_t to_snapshot_id_inclusive,
+ internal::ToSnapshotIdInclusive(scan.context(), *scan.metadata()));
+ ICEBERG_ASSIGN_OR_RAISE(
+ std::optional<int64_t> from_snapshot_id_exclusive,
+ internal::FromSnapshotIdExclusive(scan.context(), *scan.metadata(),
+ to_snapshot_id_inclusive));
+
+ return scan.PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive);
+}
+
// IncrementalAppendScan implementation
Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
- [[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
- [[maybe_unused]] std::shared_ptr<Schema> schema,
- [[maybe_unused]] std::shared_ptr<FileIO> io,
- [[maybe_unused]] internal::TableScanContext context) {
- return NotImplemented("IncrementalAppendScan is not implemented");
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+ ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+ ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
+ std::move(metadata), std::move(schema), std::move(io),
std::move(context)));
+}
+
+Result<std::vector<std::shared_ptr<FileScanTask>>>
IncrementalAppendScan::PlanFiles()
+ const {
+ return ResolvePlanFiles<FileScanTask>(*this);
}
Result<std::vector<std::shared_ptr<FileScanTask>>>
IncrementalAppendScan::PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
- return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto ancestors_snapshots,
+ SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+ from_snapshot_id_exclusive));
+
+ std::vector<std::shared_ptr<Snapshot>> append_snapshots;
+ std::ranges::copy_if(ancestors_snapshots,
std::back_inserter(append_snapshots),
+ [](const auto& snapshot) {
+ return snapshot != nullptr &&
+ snapshot->Operation().has_value() &&
+ snapshot->Operation().value() ==
DataOperation::kAppend;
+ });
+ if (append_snapshots.empty()) {
+ return std::vector<std::shared_ptr<FileScanTask>>{};
+ }
+
+ std::unordered_set<int64_t> snapshot_ids;
+ std::ranges::transform(append_snapshots,
+ std::inserter(snapshot_ids, snapshot_ids.end()),
+ [](const auto& snapshot) { return
snapshot->snapshot_id; });
+
+ std::unordered_set<ManifestFile> data_manifests;
+ for (const auto& snapshot : append_snapshots) {
+ SnapshotCache snapshot_cache(snapshot.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
+ std::ranges::copy_if(manifests, std::inserter(data_manifests,
data_manifests.end()),
+ [&snapshot_ids](const ManifestFile& manifest) {
+ return
snapshot_ids.contains(manifest.added_snapshot_id);
+ });
+ }
+ if (data_manifests.empty()) {
+ return std::vector<std::shared_ptr<FileScanTask>>{};
+ }
+
+ TableMetadataCache metadata_cache(metadata_.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id,
metadata_cache.GetPartitionSpecsById());
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto manifest_group,
+ ManifestGroup::Make(
+ io_, schema_, specs_by_id,
+ std::vector<ManifestFile>(data_manifests.begin(),
data_manifests.end()), {}));
+
+ manifest_group->CaseSensitive(context_.case_sensitive)
+ .Select(ScanColumns())
+ .FilterData(filter())
+ .FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
+ return entry.snapshot_id.has_value() &&
+ snapshot_ids.contains(entry.snapshot_id.value()) &&
+ entry.status == ManifestStatus::kAdded;
+ })
+ .IgnoreDeleted()
+ .ColumnsToKeepStats(context_.columns_to_keep_stats);
+
+ if (context_.ignore_residuals) {
+ manifest_group->IgnoreResiduals();
+ }
+
+ return manifest_group->PlanFiles();
}
// IncrementalChangelogScan implementation
@@ -562,6 +754,11 @@ Result<std::unique_ptr<IncrementalChangelogScan>>
IncrementalChangelogScan::Make
return NotImplemented("IncrementalChangelogScan is not implemented");
}
+Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
+IncrementalChangelogScan::PlanFiles() const {
+ return ResolvePlanFiles<ChangelogScanTask>(*this);
+}
+
Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
IncrementalChangelogScan::PlanFiles(std::optional<int64_t>
from_snapshot_id_exclusive,
int64_t to_snapshot_id_inclusive) const {
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index d5bf6b4a..b21adcca 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -29,6 +29,7 @@
#include "iceberg/arrow_c_data.h"
#include "iceberg/result.h"
+#include "iceberg/table_metadata.h"
#include "iceberg/type_fwd.h"
#include "iceberg/util/error_collector.h"
@@ -355,15 +356,11 @@ class ICEBERG_EXPORT DataTableScan : public TableScan {
/// \brief A base template class for incremental scans that read changes
between
/// snapshots, and return scan tasks of the specified type.
template <typename ScanTaskType>
-class ICEBERG_EXPORT IncrementalScan : public TableScan {
+class IncrementalScan : public TableScan {
public:
~IncrementalScan() override = default;
- /// \brief Plans the scan tasks by resolving manifests and data files.
- /// \return A Result containing scan tasks or an error.
- Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const {
- return NotImplemented("IncrementalScan::PlanFiles is not implemented");
- }
+ virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const
= 0;
protected:
virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
@@ -371,6 +368,11 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {
int64_t to_snapshot_id_inclusive) const = 0;
using TableScan::TableScan;
+
+ // Allow the free function ResolvePlanFiles to access protected members.
+ template <typename T>
+ friend Result<std::vector<std::shared_ptr<T>>> ResolvePlanFiles(
+ const IncrementalScan<T>& scan);
};
/// \brief A scan that reads data files added between snapshots (incremental
appends).
@@ -383,6 +385,8 @@ class ICEBERG_EXPORT IncrementalAppendScan : public
IncrementalScan<FileScanTask
~IncrementalAppendScan() override = default;
+ Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const
override;
+
protected:
Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
@@ -402,6 +406,8 @@ class ICEBERG_EXPORT IncrementalChangelogScan
~IncrementalChangelogScan() override = default;
+ Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles() const
override;
+
protected:
Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
std::optional<int64_t> from_snapshot_id_exclusive,
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index c802df8a..6492b9bb 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -173,6 +173,7 @@ if(ICEBERG_BUILD_BUNDLE)
USE_BUNDLE
SOURCES
file_scan_task_test.cc
+ incremental_append_scan_test.cc
table_scan_test.cc)
add_iceberg_test(table_update_test
diff --git a/src/iceberg/test/incremental_append_scan_test.cc
b/src/iceberg/test/incremental_append_scan_test.cc
new file mode 100644
index 00000000..559aedd8
--- /dev/null
+++ b/src/iceberg/test/incremental_append_scan_test.cc
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/test/scan_test_base.h"
+
+namespace iceberg {
+
+class IncrementalAppendScanTest : public ScanTestBase {};
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusive) {
+ auto version = GetParam();
+
+ // Create 3 snapshots, each appending one file
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b =
+ MakeAppendSnapshot(version, 2000L, 1000L, 2L,
{"/path/to/file_b.parquet"});
+ auto snapshot_c =
+ MakeAppendSnapshot(version, 3000L, 2000L, 3L,
{"/path/to/file_c.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_c}, 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: from_snapshot_inclusive(snapshot_a) should return 3 files (A, B, C)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(1000L, /*inclusive=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 3);
+ EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre(
+ "/path/to/file_a.parquet",
"/path/to/file_b.parquet",
+ "/path/to/file_c.parquet"));
+ }
+
+ // Test: from_snapshot_inclusive(snapshot_a).to_snapshot(snapshot_c) should
return 3
+ // files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot(3000L);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 3);
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithNonExistingRef) {
+ auto metadata = MakeTableMetadata({}, -1L);
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("non_existing_ref", /*inclusive=*/true);
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Cannot find ref:
non_existing_ref")));
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithTag) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b = MakeAppendSnapshot(
+ version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet",
"/path/to/file_c.parquet"});
+ auto snapshot_current = MakeAppendSnapshot(
+ version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet",
"/path/to/file_a2.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_current}, 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})},
+ {"t1", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 1000L, .retention =
SnapshotRef::Tag{}})},
+ {"t2", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 2000L, .retention =
SnapshotRef::Tag{}})}});
+
+ // Test: from_snapshot_inclusive(t1) should return 5 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1", /*inclusive=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 5);
+ }
+
+ // Test: from_snapshot_inclusive(t1).to_snapshot(t2) should return 3 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1", /*inclusive=*/true).ToSnapshot("t2");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 3);
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithBranchShouldFail) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a}, 1000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 1000L, .retention =
SnapshotRef::Branch{}})},
+ {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 1000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: from_snapshot_inclusive(branch_name) should fail
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("b1", /*inclusive=*/true);
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Ref b1 is not a tag")));
+ }
+
+ // Test: to_snapshot(branch_name) should fail
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot("b1");
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Ref b1 is not a tag")));
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranch) {
+ auto version = GetParam();
+
+ // Common ancestor
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ // Main branch snapshots
+ auto snapshot_main_b = MakeAppendSnapshot(
+ version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet",
"/path/to/file_c.parquet"});
+ auto snapshot_current = MakeAppendSnapshot(
+ version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet",
"/path/to/file_a2.parquet"});
+ // Branch b1 snapshots
+ auto snapshot_branch_b =
+ MakeAppendSnapshot(version, 4000L, 1000L, 2L,
{"/path/to/file_c_branch.parquet"});
+ auto snapshot_branch_c =
+ MakeAppendSnapshot(version, 5000L, 4000L, 3L,
{"/path/to/file_c_branch2.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_main_b, snapshot_current, snapshot_branch_b,
+ snapshot_branch_c},
+ 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})},
+ {"t1", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 1000L, .retention =
SnapshotRef::Tag{}})},
+ {"t2", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 2000L, .retention =
SnapshotRef::Tag{}})},
+ {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 5000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: from_snapshot_inclusive(t1) on main should return 5 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1", /*inclusive=*/true);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 5);
+ }
+
+ // Test: from_snapshot_inclusive(t1).use_branch(b1) should return 3 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1", /*inclusive=*/true).UseBranch("b1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 3);
+ }
+
+ // Test: to_snapshot(snapshot_branch_b).use_branch(b1) should return 2 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot(4000L).UseBranch("b1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ }
+
+ // Test: to_snapshot(snapshot_branch_c).use_branch(b1) should return 3 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot(5000L).UseBranch("b1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 3);
+ }
+
+ // Test:
from_snapshot_exclusive(t1).to_snapshot(snapshot_branch_b).use_branch(b1)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1",
/*inclusive=*/false).ToSnapshot(4000L).UseBranch("b1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 1);
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranchWithTagShouldFail) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a}, 1000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 1000L, .retention =
SnapshotRef::Branch{}})},
+ {"t1", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 1000L, .retention =
SnapshotRef::Tag{}})}});
+
+ // Test: use_branch(tag_name) should fail
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(1000L, /*inclusive=*/true).UseBranch("t1");
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Ref t1 is not a branch")));
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranchWithInvalidSnapshotShouldFail) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_main_b = MakeAppendSnapshot(
+ version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet",
"/path/to/file_c.parquet"});
+ auto snapshot_branch_b =
+ MakeAppendSnapshot(version, 3000L, 1000L, 2L,
{"/path/to/file_d.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_main_b, snapshot_branch_b}, 2000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 2000L, .retention =
SnapshotRef::Branch{}})},
+ {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: to_snapshot(snapshot_main_b).use_branch(b1) should fail
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot(2000L).UseBranch("b1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ EXPECT_THAT(
+ scan->PlanFiles(),
+ ::testing::AllOf(
+ IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage(
+ "End snapshot is not a valid snapshot on the current branch:
b1")));
+ }
+
+ // Test: from_snapshot_inclusive(snapshot_main_b).use_branch(b1) should fail
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(2000L, /*inclusive=*/true).UseBranch("b1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ EXPECT_THAT(
+ scan->PlanFiles(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Starting snapshot (inclusive) 2000
is not an "
+ "ancestor of end snapshot 3000")));
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranchWithNonExistingRef) {
+ auto metadata = MakeTableMetadata({}, -1L);
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->UseBranch("non_existing_ref");
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Cannot find ref:
non_existing_ref")));
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusive) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b =
+ MakeAppendSnapshot(version, 2000L, 1000L, 2L,
{"/path/to/file_b.parquet"});
+ auto snapshot_c =
+ MakeAppendSnapshot(version, 3000L, 2000L, 3L,
{"/path/to/file_c.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_c}, 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: from_snapshot_exclusive(snapshot_a) should return 2 files (B, C)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(1000L, /*inclusive=*/false);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
+ testing::UnorderedElementsAre("/path/to/file_b.parquet",
+ "/path/to/file_c.parquet"));
+ }
+
+ // Test: from_snapshot_exclusive(snapshot_a).to_snapshot(snapshot_b) should
return 1
+ // file (B)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(1000L, /*inclusive=*/false).ToSnapshot(2000L);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 1);
+ EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/file_b.parquet");
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithNonExistingRef) {
+ auto metadata = MakeTableMetadata({}, -1L);
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("nonExistingRef", /*inclusive=*/false);
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Cannot find ref:
nonExistingRef")));
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithTag) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b = MakeAppendSnapshot(
+ version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet",
"/path/to/file_c.parquet"});
+ auto snapshot_current = MakeAppendSnapshot(
+ version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet",
"/path/to/file_a2.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_current}, 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})},
+ {"t1", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 1000L, .retention =
SnapshotRef::Tag{}})},
+ {"t2", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 2000L, .retention =
SnapshotRef::Tag{}})}});
+
+ // Test: from_snapshot_exclusive(t1) should return 4 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1", /*inclusive=*/false);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 4);
+ }
+
+ // Test: from_snapshot_exclusive(t1).to_snapshot(t2) should return 2 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("t1", /*inclusive=*/false).ToSnapshot("t2");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithBranchShouldFail) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a}, 1000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 1000L, .retention =
SnapshotRef::Branch{}})},
+ {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 1000L, .retention =
SnapshotRef::Branch{}})}});
+
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot("b1", /*inclusive=*/false);
+ EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Ref b1 is
not a tag")));
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshot) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b =
+ MakeAppendSnapshot(version, 2000L, 1000L, 2L,
{"/path/to/file_b.parquet"});
+ auto snapshot_c =
+ MakeAppendSnapshot(version, 3000L, 2000L, 3L,
{"/path/to/file_c.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_c}, 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: to_snapshot(snapshot_b) should return 2 files (A, B)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot(2000L);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
+ testing::UnorderedElementsAre("/path/to/file_a.parquet",
+ "/path/to/file_b.parquet"));
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshotWithTag) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b =
+ MakeAppendSnapshot(version, 2000L, 1000L, 2L,
{"/path/to/file_b.parquet"});
+ auto snapshot_current =
+ MakeAppendSnapshot(version, 3000L, 2000L, 3L,
{"/path/to/file_b2.parquet"});
+ auto snapshot_branch_b =
+ MakeAppendSnapshot(version, 4000L, 2000L, 3L,
{"/path/to/file_c.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_current, snapshot_branch_b}, 3000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 3000L, .retention =
SnapshotRef::Branch{}})},
+ {"b1", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 4000L, .retention =
SnapshotRef::Branch{}})},
+ {"t1", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 2000L, .retention =
SnapshotRef::Tag{}})},
+ {"t2", std::make_shared<SnapshotRef>(
+ SnapshotRef{.snapshot_id = 4000L, .retention =
SnapshotRef::Tag{}})}});
+
+ // Test: to_snapshot(t1) should return 2 files
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot("t1");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ }
+
+ // Test: to_snapshot(t2) should return 3 files (on branch b1)
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot("t2");
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 3);
+ }
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshotWithNonExistingRef) {
+ auto metadata = MakeTableMetadata({}, -1L);
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot("non_existing_ref");
+ EXPECT_THAT(builder->Build(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Cannot find ref:
non_existing_ref")));
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshotWithBranchShouldFail) {
+ auto version = GetParam();
+
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ auto snapshot_b =
+ MakeAppendSnapshot(version, 2000L, 1000L, 2L,
{"/path/to/file_b.parquet"});
+
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b}, 2000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 2000L, .retention =
SnapshotRef::Branch{}})},
+ {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 2000L, .retention =
SnapshotRef::Branch{}})}});
+
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot("b1");
+ EXPECT_THAT(builder->Build(),
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Ref b1 is
not a tag")));
+}
+
+TEST_P(IncrementalAppendScanTest, MultipleRootSnapshots) {
+ auto version = GetParam();
+
+ // Snapshot A (will be "expired" by not having it as parent of C)
+ auto snapshot_a =
+ MakeAppendSnapshot(version, 1000L, std::nullopt, 1L,
{"/path/to/file_a.parquet"});
+ // Snapshot B (staged, orphaned - not an ancestor of main branch)
+ auto snapshot_b =
+ MakeAppendSnapshot(version, 2000L, 1000L, 2L,
{"/path/to/file_b.parquet"});
+ // Snapshot C (new root after A is expired)
+ auto snapshot_c =
+ MakeAppendSnapshot(version, 3000L, std::nullopt, 3L,
{"/path/to/file_c.parquet"});
+ // Snapshot D
+ auto snapshot_d =
+ MakeAppendSnapshot(version, 4000L, 3000L, 4L,
{"/path/to/file_d.parquet"});
+
+ // Note: snapshot_a is kept in metadata but not in the parent chain of C/D
+ // This simulates expiring snapshot A, creating two root snapshots (B and C)
+ auto metadata = MakeTableMetadata(
+ {snapshot_a, snapshot_b, snapshot_c, snapshot_d}, 4000L,
+ {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+ .snapshot_id = 4000L, .retention =
SnapshotRef::Branch{}})}});
+
+ // Test: to_snapshot(snapshot_d) should discover snapshots C and D only
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->ToSnapshot(4000L);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+ ASSERT_EQ(tasks.size(), 2);
+ EXPECT_THAT(GetPaths(tasks),
+ testing::UnorderedElementsAre("/path/to/file_c.parquet",
+ "/path/to/file_d.parquet"));
+ }
+
+ // Test: from_snapshot_exclusive(snapshot_b).to_snapshot(snapshot_d) should
fail
+ // because B is not a parent ancestor of D
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(2000L, /*inclusive=*/false).ToSnapshot(4000L);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ EXPECT_THAT(
+ scan->PlanFiles(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Starting snapshot (exclusive) 2000
is not a "
+ "parent ancestor of end snapshot
4000")));
+ }
+
+ // Test: from_snapshot_inclusive(snapshot_b).to_snapshot(snapshot_d) should
fail
+ // because B is not an ancestor of D
+ {
+ ICEBERG_UNWRAP_OR_FAIL(auto builder,
+ IncrementalAppendScanBuilder::Make(metadata,
file_io_));
+ builder->FromSnapshot(2000L, /*inclusive=*/true).ToSnapshot(4000L);
+ ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+ EXPECT_THAT(
+ scan->PlanFiles(),
+ ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+ HasErrorMessage("Starting snapshot (inclusive) 2000
is not an "
+ "ancestor of end snapshot 4000")));
+ }
+}
+
+INSTANTIATE_TEST_SUITE_P(IncrementalAppendScanVersions,
IncrementalAppendScanTest,
+ testing::Values(1, 2, 3));
+
+} // namespace iceberg
diff --git a/src/iceberg/test/scan_test_base.h
b/src/iceberg/test/scan_test_base.h
new file mode 100644
index 00000000..65a4e053
--- /dev/null
+++ b/src/iceberg/test/scan_test_base.h
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+/// \brief Base class for scan-related tests providing common test utilities.
+///
+/// This class provides common setup and helper functions for testing
+/// TableScan and IncrementalScan implementations.
+class ScanTestBase : public testing::TestWithParam<int8_t> {
+ protected:
+ void SetUp() override {
+ avro::RegisterAll();
+
+ file_io_ = arrow::MakeMockFileIO();
+ schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+ SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+ SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+ unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+
+ ICEBERG_UNWRAP_OR_FAIL(
+ partitioned_spec_,
+ PartitionSpec::Make(
+ /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+ "data_bucket_16_2",
Transform::Bucket(16))}));
+ }
+
+ /// \brief Generate a unique manifest file path.
+ std::string MakeManifestPath() {
+ return std::format("manifest-{}-{}.avro", manifest_counter_++,
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ /// \brief Generate a unique manifest list file path.
+ std::string MakeManifestListPath() {
+ return std::format("manifest-list-{}-{}.avro", manifest_list_counter_++,
+
std::chrono::system_clock::now().time_since_epoch().count());
+ }
+
+ /// \brief Create a manifest entry.
+ ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+ int64_t sequence_number, std::shared_ptr<DataFile>
file) {
+ return ManifestEntry{
+ .status = status,
+ .snapshot_id = snapshot_id,
+ .sequence_number = sequence_number,
+ .file_sequence_number = sequence_number,
+ .data_file = std::move(file),
+ };
+ }
+
+ /// \brief Write a data manifest file.
+ ManifestFile WriteDataManifest(
+ int8_t format_version, int64_t snapshot_id, std::vector<ManifestEntry>
entries,
+ std::shared_ptr<PartitionSpec> spec = PartitionSpec::Unpartitioned()) {
+ const std::string manifest_path = MakeManifestPath();
+ auto writer_result = ManifestWriter::MakeWriter(
+ format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
+ ManifestContent::kData,
+ /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) :
std::nullopt);
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ /// \brief Write a delete manifest file.
+ ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id,
+ std::vector<ManifestEntry> entries,
+ std::shared_ptr<PartitionSpec> spec) {
+ const std::string manifest_path = MakeManifestPath();
+ auto writer_result =
+ ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path,
file_io_,
+ spec, schema_, ManifestContent::kDeletes);
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+
+ for (const auto& entry : entries) {
+ EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+ }
+
+ EXPECT_THAT(writer->Close(), IsOk());
+ auto manifest_result = writer->ToManifestFile();
+ EXPECT_THAT(manifest_result, IsOk());
+ return std::move(manifest_result.value());
+ }
+
+ /// \brief Write a manifest list file.
+ std::string WriteManifestList(int8_t format_version, int64_t snapshot_id,
+ int64_t parent_snapshot_id, int64_t
sequence_number,
+ const std::vector<ManifestFile>& manifests) {
+ const std::string manifest_list_path = MakeManifestListPath();
+
+ auto writer_result = ManifestListWriter::MakeWriter(
+ format_version, snapshot_id, parent_snapshot_id, manifest_list_path,
file_io_,
+ /*sequence_number=*/format_version >= 2 ?
std::optional(sequence_number)
+ : std::nullopt,
+ /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) :
std::nullopt);
+
+ EXPECT_THAT(writer_result, IsOk());
+ auto writer = std::move(writer_result.value());
+ EXPECT_THAT(writer->AddAll(manifests), IsOk());
+ EXPECT_THAT(writer->Close(), IsOk());
+
+ return manifest_list_path;
+ }
+
+ /// \brief Extract file paths from scan tasks.
+ static std::vector<std::string> GetPaths(
+ const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
+ return tasks | std::views::transform([](const auto& task) {
+ return task->data_file()->file_path;
+ }) |
+ std::ranges::to<std::vector<std::string>>();
+ }
+
+ /// \brief Create table metadata with the given snapshots.
+ std::shared_ptr<TableMetadata> MakeTableMetadata(
+ const std::vector<std::shared_ptr<Snapshot>>& snapshots,
+ int64_t current_snapshot_id,
+ const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>&
refs = {},
+ std::shared_ptr<PartitionSpec> default_spec = nullptr) {
+ TimePointMs timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
+ int64_t last_seq = snapshots.empty() ? 0L :
snapshots.back()->sequence_number;
+ auto effective_spec = default_spec ? default_spec : unpartitioned_spec_;
+
+ return std::make_shared<TableMetadata>(TableMetadata{
+ .format_version = GetParam(),
+ .table_uuid = "test-table-uuid",
+ .location = "/tmp/table",
+ .last_sequence_number = last_seq,
+ .last_updated_ms = timestamp_ms,
+ .last_column_id = 2,
+ .schemas = {schema_},
+ .current_schema_id = schema_->schema_id(),
+ .partition_specs = {partitioned_spec_, unpartitioned_spec_},
+ .default_spec_id = effective_spec->spec_id(),
+ .last_partition_id = 1000,
+ .current_snapshot_id = current_snapshot_id,
+ .snapshots = snapshots,
+ .snapshot_log = {},
+ .default_sort_order_id = 0,
+ .refs = refs,
+ });
+ }
+
+ /// \brief Create a data file with optional partition values.
+ std::shared_ptr<DataFile> MakeDataFile(
+ const std::string& path,
+ PartitionValues partition = PartitionValues(std::vector<Literal>{}),
+ std::shared_ptr<PartitionSpec> spec = nullptr, int64_t record_count = 1)
{
+ auto effective_spec = spec ? spec : unpartitioned_spec_;
+ return std::make_shared<DataFile>(DataFile{
+ .file_path = path,
+ .file_format = FileFormatType::kParquet,
+ .partition = std::move(partition),
+ .record_count = record_count,
+ .file_size_in_bytes = 10,
+ .sort_order_id = 0,
+ .partition_spec_id = effective_spec->spec_id(),
+ });
+ }
+
+ /// \brief Create an append snapshot with the given files (string paths).
+ std::shared_ptr<Snapshot> MakeAppendSnapshot(
+ int8_t format_version, int64_t snapshot_id,
+ std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+ const std::vector<std::string>& added_files,
+ std::shared_ptr<PartitionSpec> spec = nullptr) {
+ std::vector<std::pair<std::string, PartitionValues>> files_with_partitions;
+ for (const auto& path : added_files) {
+ files_with_partitions.emplace_back(path, kEmptyPartition);
+ }
+ return MakeAppendSnapshotWithPartitionValues(format_version, snapshot_id,
+ parent_snapshot_id,
sequence_number,
+ files_with_partitions, spec);
+ }
+
+ /// \brief Create an append snapshot with the given files (with partition
values).
+ std::shared_ptr<Snapshot> MakeAppendSnapshotWithPartitionValues(
+ int8_t format_version, int64_t snapshot_id,
+ std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+ const std::vector<std::pair<std::string, PartitionValues>>& added_files,
+ std::shared_ptr<PartitionSpec> spec = nullptr) {
+ auto effective_spec = spec ? spec : unpartitioned_spec_;
+ std::vector<ManifestEntry> entries;
+ entries.reserve(added_files.size());
+ for (const auto& [path, partition] : added_files) {
+ auto file = MakeDataFile(path, partition, effective_spec);
+ entries.push_back(
+ MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number,
file));
+ }
+
+ auto manifest = WriteDataManifest(format_version, snapshot_id,
std::move(entries),
+ effective_spec);
+ int64_t parent_id = parent_snapshot_id.value_or(0L);
+ auto manifest_list = WriteManifestList(format_version, snapshot_id,
parent_id,
+ sequence_number, {manifest});
+ TimePointMs timestamp_ms =
+ TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
+ return std::make_shared<Snapshot>(Snapshot{
+ .snapshot_id = snapshot_id,
+ .parent_snapshot_id = parent_snapshot_id,
+ .sequence_number = sequence_number,
+ .timestamp_ms = timestamp_ms,
+ .manifest_list = manifest_list,
+ .summary = {{"operation", "append"}},
+ .schema_id = schema_->schema_id(),
+ });
+ }
+
+ std::shared_ptr<FileIO> file_io_;
+ std::shared_ptr<Schema> schema_;
+ std::shared_ptr<PartitionSpec> partitioned_spec_;
+ std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+
+ private:
+ int manifest_counter_ = 0;
+ int manifest_list_counter_ = 0;
+ constexpr static PartitionValues kEmptyPartition{};
+};
+
+} // namespace iceberg
diff --git a/src/iceberg/test/table_scan_test.cc
b/src/iceberg/test/table_scan_test.cc
index d9fc1e92..e4a3d21f 100644
--- a/src/iceberg/test/table_scan_test.cc
+++ b/src/iceberg/test/table_scan_test.cc
@@ -17,53 +17,26 @@
* under the License.
*/
-#include "iceberg/table_scan.h"
-
-#include <chrono>
-#include <format>
#include <memory>
#include <optional>
#include <string>
+#include <unordered_map>
#include <vector>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
-#include "iceberg/arrow/arrow_file_io.h"
-#include "iceberg/avro/avro_register.h"
#include "iceberg/expression/expressions.h"
-#include "iceberg/manifest/manifest_entry.h"
-#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/manifest/manifest_reader.h"
-#include "iceberg/manifest/manifest_writer.h"
-#include "iceberg/partition_spec.h"
-#include "iceberg/schema.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/transform.h"
-#include "iceberg/type.h"
-#include "iceberg/util/timepoint.h"
+#include "iceberg/test/scan_test_base.h"
namespace iceberg {
-class TableScanTest : public testing::TestWithParam<int8_t> {
+class TableScanTest : public ScanTestBase {
protected:
void SetUp() override {
- avro::RegisterAll();
-
- file_io_ = arrow::MakeMockFileIO();
- schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
- SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
- SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
- unpartitioned_spec_ = PartitionSpec::Unpartitioned();
-
- ICEBERG_UNWRAP_OR_FAIL(
- partitioned_spec_,
- PartitionSpec::Make(
- /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
- "data_bucket_16_2",
Transform::Bucket(16))}));
-
+ ScanTestBase::SetUp();
MakeTableMetadata();
}
@@ -134,12 +107,6 @@ class TableScanTest : public
testing::TestWithParam<int8_t> {
});
}
- std::string MakeManifestPath() {
- static int counter = 0;
- return std::format("manifest-{}-{}.avro", counter++,
-
std::chrono::system_clock::now().time_since_epoch().count());
- }
-
std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
const PartitionValues& partition,
int32_t spec_id, int64_t record_count
= 1,
@@ -164,84 +131,12 @@ class TableScanTest : public
testing::TestWithParam<int8_t> {
return file;
}
- ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
- int64_t sequence_number, std::shared_ptr<DataFile>
file) {
- return ManifestEntry{
- .status = status,
- .snapshot_id = snapshot_id,
- .sequence_number = sequence_number,
- .file_sequence_number = sequence_number,
- .data_file = std::move(file),
- };
- }
-
- ManifestFile WriteDataManifest(int8_t format_version, int64_t snapshot_id,
- std::vector<ManifestEntry> entries,
- std::shared_ptr<PartitionSpec> spec) {
- const std::string manifest_path = MakeManifestPath();
- auto writer_result = ManifestWriter::MakeWriter(
- format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
- ManifestContent::kData,
- /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) :
std::nullopt);
-
- EXPECT_THAT(writer_result, IsOk());
- auto writer = std::move(writer_result.value());
-
- for (const auto& entry : entries) {
- EXPECT_THAT(writer->WriteEntry(entry), IsOk());
- }
-
- EXPECT_THAT(writer->Close(), IsOk());
- auto manifest_result = writer->ToManifestFile();
- EXPECT_THAT(manifest_result, IsOk());
- return std::move(manifest_result.value());
- }
-
- ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id,
- std::vector<ManifestEntry> entries,
- std::shared_ptr<PartitionSpec> spec) {
- const std::string manifest_path = MakeManifestPath();
- auto writer_result =
- ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path,
file_io_,
- spec, schema_, ManifestContent::kDeletes);
-
- EXPECT_THAT(writer_result, IsOk());
- auto writer = std::move(writer_result.value());
-
- for (const auto& entry : entries) {
- EXPECT_THAT(writer->WriteEntry(entry), IsOk());
- }
-
- EXPECT_THAT(writer->Close(), IsOk());
- auto manifest_result = writer->ToManifestFile();
- EXPECT_THAT(manifest_result, IsOk());
- return std::move(manifest_result.value());
- }
-
- std::string MakeManifestListPath() {
- static int counter = 0;
- return std::format("manifest-list-{}-{}.avro", counter++,
-
std::chrono::system_clock::now().time_since_epoch().count());
- }
-
std::string WriteManifestList(int8_t format_version, int64_t snapshot_id,
int64_t sequence_number,
const std::vector<ManifestFile>& manifests) {
- const std::string manifest_list_path = MakeManifestListPath();
- constexpr int64_t kParentSnapshotId = 0L;
-
- auto writer_result = ManifestListWriter::MakeWriter(
- format_version, snapshot_id, kParentSnapshotId, manifest_list_path,
file_io_,
- /*sequence_number=*/format_version >= 2 ?
std::optional(sequence_number)
- : std::nullopt,
- /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) :
std::nullopt);
-
- EXPECT_THAT(writer_result, IsOk());
- auto writer = std::move(writer_result.value());
- EXPECT_THAT(writer->AddAll(manifests), IsOk());
- EXPECT_THAT(writer->Close(), IsOk());
-
- return manifest_list_path;
+ return ScanTestBase::WriteManifestList(format_version, snapshot_id,
+ 0L /*parent_snapshot_id*/,
sequence_number,
+ manifests);
}
std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
@@ -249,18 +144,6 @@ class TableScanTest : public
testing::TestWithParam<int8_t> {
{unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
}
- static std::vector<std::string> GetPaths(
- const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
- return tasks | std::views::transform([](const auto& task) {
- return task->data_file()->file_path;
- }) |
- std::ranges::to<std::vector<std::string>>();
- }
-
- std::shared_ptr<FileIO> file_io_;
- std::shared_ptr<Schema> schema_;
- std::shared_ptr<PartitionSpec> partitioned_spec_;
- std::shared_ptr<PartitionSpec> unpartitioned_spec_;
std::shared_ptr<TableMetadata> table_metadata_;
};