This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new cdf05d65 feat: implement IncrementalAppendScan (#590)
cdf05d65 is described below

commit cdf05d6586a322410789717d7c5c81b45c18ae78
Author: wzhuo <[email protected]>
AuthorDate: Fri Mar 27 10:31:59 2026 +0800

    feat: implement IncrementalAppendScan (#590)
---
 src/iceberg/manifest/manifest_list.h             |  13 +-
 src/iceberg/table_scan.cc                        | 223 ++++++++-
 src/iceberg/table_scan.h                         |  18 +-
 src/iceberg/test/CMakeLists.txt                  |   1 +
 src/iceberg/test/incremental_append_scan_test.cc | 601 +++++++++++++++++++++++
 src/iceberg/test/scan_test_base.h                | 275 +++++++++++
 src/iceberg/test/table_scan_test.cc              | 131 +----
 7 files changed, 1118 insertions(+), 144 deletions(-)

diff --git a/src/iceberg/manifest/manifest_list.h 
b/src/iceberg/manifest/manifest_list.h
index 2f3185a1..65271368 100644
--- a/src/iceberg/manifest/manifest_list.h
+++ b/src/iceberg/manifest/manifest_list.h
@@ -230,7 +230,9 @@ struct ICEBERG_EXPORT ManifestFile {
       kFirstRowIdFieldId, "first_row_id", int64(),
       "Starting row ID to assign to new rows in ADDED data files");
 
-  bool operator==(const ManifestFile& other) const = default;
+  bool operator==(const ManifestFile& other) const {
+    return manifest_path == other.manifest_path;
+  }
 
   static const std::shared_ptr<StructType>& Type();
 };
@@ -272,3 +274,12 @@ ICEBERG_EXPORT inline constexpr Result<ManifestContent> 
ManifestContentFromStrin
 }
 
 }  // namespace iceberg
+
+namespace std {
+template <>
+struct hash<iceberg::ManifestFile> {
+  size_t operator()(const iceberg::ManifestFile& manifest_file) const {
+    return std::hash<std::string>{}(manifest_file.manifest_path);
+  }
+};
+}  // namespace std
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 937af94b..ed2ede70 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -171,6 +171,90 @@ Status TableScanContext::Validate() const {
   return {};
 }
 
+bool IsScanCurrentLineage(const TableScanContext& context) {
+  return !context.from_snapshot_id.has_value() && 
!context.to_snapshot_id.has_value();
+}
+
+Result<int64_t> ToSnapshotIdInclusive(const TableScanContext& context,
+                                      const TableMetadata& metadata) {
+  // Get the branch's current snapshot ID if branch is set
+  std::shared_ptr<Snapshot> branch_snapshot;
+  const std::string& branch = context.branch;
+  if (!branch.empty()) {
+    auto iter = metadata.refs.find(branch);
+    ICEBERG_CHECK(iter != metadata.refs.end() && iter->second != nullptr,
+                  "Cannot find branch: {}", branch);
+    ICEBERG_ASSIGN_OR_RAISE(branch_snapshot,
+                            metadata.SnapshotById(iter->second->snapshot_id));
+  }
+
+  if (context.to_snapshot_id.has_value()) {
+    int64_t to_snapshot_id_value = context.to_snapshot_id.value();
+
+    if (branch_snapshot != nullptr) {
+      // Validate `to_snapshot_id` is on the current branch
+      ICEBERG_ASSIGN_OR_RAISE(
+          bool is_ancestor,
+          SnapshotUtil::IsAncestorOf(metadata, branch_snapshot->snapshot_id,
+                                     to_snapshot_id_value));
+      ICEBERG_CHECK(is_ancestor,
+                    "End snapshot is not a valid snapshot on the current 
branch: {}",
+                    branch);
+    }
+
+    return to_snapshot_id_value;
+  }
+
+  // If to_snapshot_id is not set, use branch's current snapshot if branch is 
set
+  if (branch_snapshot != nullptr) {
+    return branch_snapshot->snapshot_id;
+  }
+
+  // Get current snapshot from table's current snapshot
+  std::shared_ptr<Snapshot> current_snapshot;
+  ICEBERG_ASSIGN_OR_RAISE(current_snapshot, metadata.Snapshot());
+  ICEBERG_CHECK(current_snapshot != nullptr,
+                "End snapshot is not set and table has no current snapshot");
+  return current_snapshot->snapshot_id;
+}
+
+Result<std::optional<int64_t>> FromSnapshotIdExclusive(const TableScanContext& 
context,
+                                                       const TableMetadata& 
metadata,
+                                                       int64_t 
to_snapshot_id_inclusive) {
+  if (!context.from_snapshot_id.has_value()) {
+    return std::nullopt;
+  }
+
+  int64_t from_snapshot_id = context.from_snapshot_id.value();
+
+  // Validate `from_snapshot_id` is an ancestor of `to_snapshot_id_inclusive`
+  if (context.from_snapshot_id_inclusive) {
+    ICEBERG_ASSIGN_OR_RAISE(
+        bool is_ancestor,
+        SnapshotUtil::IsAncestorOf(metadata, to_snapshot_id_inclusive, 
from_snapshot_id));
+    ICEBERG_CHECK(
+        is_ancestor,
+        "Starting snapshot (inclusive) {} is not an ancestor of end snapshot 
{}",
+        from_snapshot_id, to_snapshot_id_inclusive);
+
+    // For inclusive behavior, return the parent snapshot ID (can be nullopt)
+    ICEBERG_ASSIGN_OR_RAISE(auto from_snapshot, 
metadata.SnapshotById(from_snapshot_id));
+    return from_snapshot->parent_snapshot_id;
+  }
+
+  // Validate there is an ancestor of `to_snapshot_id_inclusive` where parent 
is
+  // `from_snapshot_id`
+  ICEBERG_ASSIGN_OR_RAISE(bool is_parent_ancestor,
+                          SnapshotUtil::IsParentAncestorOf(
+                              metadata, to_snapshot_id_inclusive, 
from_snapshot_id));
+  ICEBERG_CHECK(
+      is_parent_ancestor,
+      "Starting snapshot (exclusive) {} is not a parent ancestor of end 
snapshot {}",
+      from_snapshot_id, to_snapshot_id_inclusive);
+
+  return from_snapshot_id;
+}
+
 }  // namespace internal
 
 ScanTask::~ScanTask() = default;
@@ -340,10 +424,15 @@ TableScanBuilder<ScanType>& 
TableScanBuilder<ScanType>::AsOfTime(
 
 template <typename ScanType>
 TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::FromSnapshot(
-    [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool inclusive)
+    int64_t from_snapshot_id, bool inclusive)
   requires IsIncrementalScan<ScanType>
 {
-  AddError(NotImplemented("Incremental scan is not implemented"));
+  if (inclusive) {
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
+                                     
metadata_->SnapshotById(from_snapshot_id));
+  }
+  this->context_.from_snapshot_id = from_snapshot_id;
+  this->context_.from_snapshot_id_inclusive = inclusive;
   return *this;
 }
 
@@ -352,15 +441,20 @@ TableScanBuilder<ScanType>& 
TableScanBuilder<ScanType>::FromSnapshot(
     const std::string& ref, bool inclusive)
   requires IsIncrementalScan<ScanType>
 {
-  AddError(NotImplemented("Incremental scan is not implemented"));
-  return *this;
+  auto iter = metadata_->refs.find(ref);
+  ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", 
ref);
+  ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+  ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
+                        "Ref {} is not a tag", ref);
+  return FromSnapshot(iter->second->snapshot_id, inclusive);
 }
 
 template <typename ScanType>
 TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(int64_t 
to_snapshot_id)
   requires IsIncrementalScan<ScanType>
 {
-  AddError(NotImplemented("Incremental scan is not implemented"));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, 
metadata_->SnapshotById(to_snapshot_id));
+  context_.to_snapshot_id = to_snapshot_id;
   return *this;
 }
 
@@ -368,8 +462,12 @@ template <typename ScanType>
 TableScanBuilder<ScanType>& TableScanBuilder<ScanType>::ToSnapshot(const 
std::string& ref)
   requires IsIncrementalScan<ScanType>
 {
-  AddError(NotImplemented("Incremental scan is not implemented"));
-  return *this;
+  auto iter = metadata_->refs.find(ref);
+  ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", 
ref);
+  ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+  ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kTag,
+                        "Ref {} is not a tag", ref);
+  return ToSnapshot(iter->second->snapshot_id);
 }
 
 template <typename ScanType>
@@ -377,6 +475,11 @@ TableScanBuilder<ScanType>& 
TableScanBuilder<ScanType>::UseBranch(
     const std::string& branch)
   requires IsIncrementalScan<ScanType>
 {
+  auto iter = metadata_->refs.find(branch);
+  ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref: {}", 
branch);
+  ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", branch);
+  ICEBERG_BUILDER_CHECK(iter->second->type() == SnapshotRefType::kBranch,
+                        "Ref {} is not a branch", branch);
   context_.branch = branch;
   return *this;
 }
@@ -536,20 +639,109 @@ Result<std::vector<std::shared_ptr<FileScanTask>>> 
DataTableScan::PlanFiles() co
   return manifest_group->PlanFiles();
 }
 
+// Friend function template for IncrementalScan that implements the shared 
PlanFiles
+// logic. It resolves the from/to snapshot range from the scan context and 
delegates
+// to the two-arg virtual PlanFiles() override in the concrete subclass.
+// Defined as a friend to access the protected two-arg PlanFiles().
+template <typename ScanTaskType>
+Result<std::vector<std::shared_ptr<ScanTaskType>>> ResolvePlanFiles(
+    const IncrementalScan<ScanTaskType>& scan) {
+  if (IsScanCurrentLineage(scan.context())) {
+    if (scan.metadata()->current_snapshot_id == kInvalidSnapshotId) {
+      return std::vector<std::shared_ptr<ScanTaskType>>{};
+    }
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      int64_t to_snapshot_id_inclusive,
+      internal::ToSnapshotIdInclusive(scan.context(), *scan.metadata()));
+  ICEBERG_ASSIGN_OR_RAISE(
+      std::optional<int64_t> from_snapshot_id_exclusive,
+      internal::FromSnapshotIdExclusive(scan.context(), *scan.metadata(),
+                                        to_snapshot_id_inclusive));
+
+  return scan.PlanFiles(from_snapshot_id_exclusive, to_snapshot_id_inclusive);
+}
+
 // IncrementalAppendScan implementation
 
 Result<std::unique_ptr<IncrementalAppendScan>> IncrementalAppendScan::Make(
-    [[maybe_unused]] std::shared_ptr<TableMetadata> metadata,
-    [[maybe_unused]] std::shared_ptr<Schema> schema,
-    [[maybe_unused]] std::shared_ptr<FileIO> io,
-    [[maybe_unused]] internal::TableScanContext context) {
-  return NotImplemented("IncrementalAppendScan is not implemented");
+    std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+    std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+  ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+  ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+  return std::unique_ptr<IncrementalAppendScan>(new IncrementalAppendScan(
+      std::move(metadata), std::move(schema), std::move(io), 
std::move(context)));
+}
+
+Result<std::vector<std::shared_ptr<FileScanTask>>> 
IncrementalAppendScan::PlanFiles()
+    const {
+  return ResolvePlanFiles<FileScanTask>(*this);
 }
 
 Result<std::vector<std::shared_ptr<FileScanTask>>> 
IncrementalAppendScan::PlanFiles(
     std::optional<int64_t> from_snapshot_id_exclusive,
     int64_t to_snapshot_id_inclusive) const {
-  return NotImplemented("IncrementalAppendScan::PlanFiles is not implemented");
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto ancestors_snapshots,
+      SnapshotUtil::AncestorsBetween(*metadata_, to_snapshot_id_inclusive,
+                                     from_snapshot_id_exclusive));
+
+  std::vector<std::shared_ptr<Snapshot>> append_snapshots;
+  std::ranges::copy_if(ancestors_snapshots, 
std::back_inserter(append_snapshots),
+                       [](const auto& snapshot) {
+                         return snapshot != nullptr &&
+                                snapshot->Operation().has_value() &&
+                                snapshot->Operation().value() == 
DataOperation::kAppend;
+                       });
+  if (append_snapshots.empty()) {
+    return std::vector<std::shared_ptr<FileScanTask>>{};
+  }
+
+  std::unordered_set<int64_t> snapshot_ids;
+  std::ranges::transform(append_snapshots,
+                         std::inserter(snapshot_ids, snapshot_ids.end()),
+                         [](const auto& snapshot) { return 
snapshot->snapshot_id; });
+
+  std::unordered_set<ManifestFile> data_manifests;
+  for (const auto& snapshot : append_snapshots) {
+    SnapshotCache snapshot_cache(snapshot.get());
+    ICEBERG_ASSIGN_OR_RAISE(auto manifests, snapshot_cache.DataManifests(io_));
+    std::ranges::copy_if(manifests, std::inserter(data_manifests, 
data_manifests.end()),
+                         [&snapshot_ids](const ManifestFile& manifest) {
+                           return 
snapshot_ids.contains(manifest.added_snapshot_id);
+                         });
+  }
+  if (data_manifests.empty()) {
+    return std::vector<std::shared_ptr<FileScanTask>>{};
+  }
+
+  TableMetadataCache metadata_cache(metadata_.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto manifest_group,
+      ManifestGroup::Make(
+          io_, schema_, specs_by_id,
+          std::vector<ManifestFile>(data_manifests.begin(), 
data_manifests.end()), {}));
+
+  manifest_group->CaseSensitive(context_.case_sensitive)
+      .Select(ScanColumns())
+      .FilterData(filter())
+      .FilterManifestEntries([&snapshot_ids](const ManifestEntry& entry) {
+        return entry.snapshot_id.has_value() &&
+               snapshot_ids.contains(entry.snapshot_id.value()) &&
+               entry.status == ManifestStatus::kAdded;
+      })
+      .IgnoreDeleted()
+      .ColumnsToKeepStats(context_.columns_to_keep_stats);
+
+  if (context_.ignore_residuals) {
+    manifest_group->IgnoreResiduals();
+  }
+
+  return manifest_group->PlanFiles();
 }
 
 // IncrementalChangelogScan implementation
@@ -562,6 +754,11 @@ Result<std::unique_ptr<IncrementalChangelogScan>> 
IncrementalChangelogScan::Make
   return NotImplemented("IncrementalChangelogScan is not implemented");
 }
 
+Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
+IncrementalChangelogScan::PlanFiles() const {
+  return ResolvePlanFiles<ChangelogScanTask>(*this);
+}
+
 Result<std::vector<std::shared_ptr<ChangelogScanTask>>>
 IncrementalChangelogScan::PlanFiles(std::optional<int64_t> 
from_snapshot_id_exclusive,
                                     int64_t to_snapshot_id_inclusive) const {
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index d5bf6b4a..b21adcca 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -29,6 +29,7 @@
 
 #include "iceberg/arrow_c_data.h"
 #include "iceberg/result.h"
+#include "iceberg/table_metadata.h"
 #include "iceberg/type_fwd.h"
 #include "iceberg/util/error_collector.h"
 
@@ -355,15 +356,11 @@ class ICEBERG_EXPORT DataTableScan : public TableScan {
 /// \brief A base template class for incremental scans that read changes 
between
 /// snapshots, and return scan tasks of the specified type.
 template <typename ScanTaskType>
-class ICEBERG_EXPORT IncrementalScan : public TableScan {
+class IncrementalScan : public TableScan {
  public:
   ~IncrementalScan() override = default;
 
-  /// \brief Plans the scan tasks by resolving manifests and data files.
-  /// \return A Result containing scan tasks or an error.
-  Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const {
-    return NotImplemented("IncrementalScan::PlanFiles is not implemented");
-  }
+  virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles() const 
= 0;
 
  protected:
   virtual Result<std::vector<std::shared_ptr<ScanTaskType>>> PlanFiles(
@@ -371,6 +368,11 @@ class ICEBERG_EXPORT IncrementalScan : public TableScan {
       int64_t to_snapshot_id_inclusive) const = 0;
 
   using TableScan::TableScan;
+
+  // Allow the free function ResolvePlanFiles to access protected members.
+  template <typename T>
+  friend Result<std::vector<std::shared_ptr<T>>> ResolvePlanFiles(
+      const IncrementalScan<T>& scan);
 };
 
 /// \brief A scan that reads data files added between snapshots (incremental 
appends).
@@ -383,6 +385,8 @@ class ICEBERG_EXPORT IncrementalAppendScan : public 
IncrementalScan<FileScanTask
 
   ~IncrementalAppendScan() override = default;
 
+  Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const 
override;
+
  protected:
   Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles(
       std::optional<int64_t> from_snapshot_id_exclusive,
@@ -402,6 +406,8 @@ class ICEBERG_EXPORT IncrementalChangelogScan
 
   ~IncrementalChangelogScan() override = default;
 
+  Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles() const 
override;
+
  protected:
   Result<std::vector<std::shared_ptr<ChangelogScanTask>>> PlanFiles(
       std::optional<int64_t> from_snapshot_id_exclusive,
diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt
index c802df8a..6492b9bb 100644
--- a/src/iceberg/test/CMakeLists.txt
+++ b/src/iceberg/test/CMakeLists.txt
@@ -173,6 +173,7 @@ if(ICEBERG_BUILD_BUNDLE)
                    USE_BUNDLE
                    SOURCES
                    file_scan_task_test.cc
+                   incremental_append_scan_test.cc
                    table_scan_test.cc)
 
   add_iceberg_test(table_update_test
diff --git a/src/iceberg/test/incremental_append_scan_test.cc 
b/src/iceberg/test/incremental_append_scan_test.cc
new file mode 100644
index 00000000..559aedd8
--- /dev/null
+++ b/src/iceberg/test/incremental_append_scan_test.cc
@@ -0,0 +1,601 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <memory>
+#include <optional>
+#include <string>
+#include <vector>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "iceberg/test/scan_test_base.h"
+
+namespace iceberg {
+
+class IncrementalAppendScanTest : public ScanTestBase {};
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusive) {
+  auto version = GetParam();
+
+  // Create 3 snapshots, each appending one file
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b =
+      MakeAppendSnapshot(version, 2000L, 1000L, 2L, 
{"/path/to/file_b.parquet"});
+  auto snapshot_c =
+      MakeAppendSnapshot(version, 3000L, 2000L, 3L, 
{"/path/to/file_c.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_c}, 3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: from_snapshot_inclusive(snapshot_a) should return 3 files (A, B, C)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(1000L, /*inclusive=*/true);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 3);
+    EXPECT_THAT(GetPaths(tasks), testing::UnorderedElementsAre(
+                                     "/path/to/file_a.parquet", 
"/path/to/file_b.parquet",
+                                     "/path/to/file_c.parquet"));
+  }
+
+  // Test: from_snapshot_inclusive(snapshot_a).to_snapshot(snapshot_c) should 
return 3
+  // files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot(3000L);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 3);
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithNonExistingRef) {
+  auto metadata = MakeTableMetadata({}, -1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->FromSnapshot("non_existing_ref", /*inclusive=*/true);
+  EXPECT_THAT(builder->Build(),
+              ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                               HasErrorMessage("Cannot find ref: 
non_existing_ref")));
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithTag) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b = MakeAppendSnapshot(
+      version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", 
"/path/to/file_c.parquet"});
+  auto snapshot_current = MakeAppendSnapshot(
+      version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", 
"/path/to/file_a2.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_current}, 3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})},
+       {"t1", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 1000L, .retention = 
SnapshotRef::Tag{}})},
+       {"t2", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 2000L, .retention = 
SnapshotRef::Tag{}})}});
+
+  // Test: from_snapshot_inclusive(t1) should return 5 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", /*inclusive=*/true);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 5);
+  }
+
+  // Test: from_snapshot_inclusive(t1).to_snapshot(t2) should return 3 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", /*inclusive=*/true).ToSnapshot("t2");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 3);
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotInclusiveWithBranchShouldFail) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a}, 1000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 1000L, .retention = 
SnapshotRef::Branch{}})},
+       {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+                  .snapshot_id = 1000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: from_snapshot_inclusive(branch_name) should fail
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("b1", /*inclusive=*/true);
+    EXPECT_THAT(builder->Build(),
+                ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                                 HasErrorMessage("Ref b1 is not a tag")));
+  }
+
+  // Test: to_snapshot(branch_name) should fail
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(1000L, /*inclusive=*/true).ToSnapshot("b1");
+    EXPECT_THAT(builder->Build(),
+                ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                                 HasErrorMessage("Ref b1 is not a tag")));
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranch) {
+  auto version = GetParam();
+
+  // Common ancestor
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  // Main branch snapshots
+  auto snapshot_main_b = MakeAppendSnapshot(
+      version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", 
"/path/to/file_c.parquet"});
+  auto snapshot_current = MakeAppendSnapshot(
+      version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", 
"/path/to/file_a2.parquet"});
+  // Branch b1 snapshots
+  auto snapshot_branch_b =
+      MakeAppendSnapshot(version, 4000L, 1000L, 2L, 
{"/path/to/file_c_branch.parquet"});
+  auto snapshot_branch_c =
+      MakeAppendSnapshot(version, 5000L, 4000L, 3L, 
{"/path/to/file_c_branch2.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_main_b, snapshot_current, snapshot_branch_b,
+       snapshot_branch_c},
+      3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})},
+       {"t1", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 1000L, .retention = 
SnapshotRef::Tag{}})},
+       {"t2", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 2000L, .retention = 
SnapshotRef::Tag{}})},
+       {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+                  .snapshot_id = 5000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: from_snapshot_inclusive(t1) on main should return 5 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", /*inclusive=*/true);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 5);
+  }
+
+  // Test: from_snapshot_inclusive(t1).use_branch(b1) should return 3 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", /*inclusive=*/true).UseBranch("b1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 3);
+  }
+
+  // Test: to_snapshot(snapshot_branch_b).use_branch(b1) should return 2 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot(4000L).UseBranch("b1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+  }
+
+  // Test: to_snapshot(snapshot_branch_c).use_branch(b1) should return 3 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot(5000L).UseBranch("b1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 3);
+  }
+
+  // Test: 
from_snapshot_exclusive(t1).to_snapshot(snapshot_branch_b).use_branch(b1)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", 
/*inclusive=*/false).ToSnapshot(4000L).UseBranch("b1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 1);
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranchWithTagShouldFail) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a}, 1000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 1000L, .retention = 
SnapshotRef::Branch{}})},
+       {"t1", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 1000L, .retention = 
SnapshotRef::Tag{}})}});
+
+  // Test: use_branch(tag_name) should fail
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->FromSnapshot(1000L, /*inclusive=*/true).UseBranch("t1");
+  EXPECT_THAT(builder->Build(),
+              ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                               HasErrorMessage("Ref t1 is not a branch")));
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranchWithInvalidSnapshotShouldFail) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_main_b = MakeAppendSnapshot(
+      version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", 
"/path/to/file_c.parquet"});
+  auto snapshot_branch_b =
+      MakeAppendSnapshot(version, 3000L, 1000L, 2L, 
{"/path/to/file_d.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_main_b, snapshot_branch_b}, 2000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 2000L, .retention = 
SnapshotRef::Branch{}})},
+       {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+                  .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: to_snapshot(snapshot_main_b).use_branch(b1) should fail
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot(2000L).UseBranch("b1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    EXPECT_THAT(
+        scan->PlanFiles(),
+        ::testing::AllOf(
+            IsError(ErrorKind::kValidationFailed),
+            HasErrorMessage(
+                "End snapshot is not a valid snapshot on the current branch: 
b1")));
+  }
+
+  // Test: from_snapshot_inclusive(snapshot_main_b).use_branch(b1) should fail
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(2000L, /*inclusive=*/true).UseBranch("b1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    EXPECT_THAT(
+        scan->PlanFiles(),
+        ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                         HasErrorMessage("Starting snapshot (inclusive) 2000 
is not an "
+                                         "ancestor of end snapshot 3000")));
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, UseBranchWithNonExistingRef) {
+  auto metadata = MakeTableMetadata({}, -1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->UseBranch("non_existing_ref");
+  EXPECT_THAT(builder->Build(),
+              ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                               HasErrorMessage("Cannot find ref: 
non_existing_ref")));
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusive) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b =
+      MakeAppendSnapshot(version, 2000L, 1000L, 2L, 
{"/path/to/file_b.parquet"});
+  auto snapshot_c =
+      MakeAppendSnapshot(version, 3000L, 2000L, 3L, 
{"/path/to/file_c.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_c}, 3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: from_snapshot_exclusive(snapshot_a) should return 2 files (B, C)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(1000L, /*inclusive=*/false);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+    EXPECT_THAT(GetPaths(tasks),
+                testing::UnorderedElementsAre("/path/to/file_b.parquet",
+                                              "/path/to/file_c.parquet"));
+  }
+
+  // Test: from_snapshot_exclusive(snapshot_a).to_snapshot(snapshot_b) should 
return 1
+  // file (B)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(1000L, /*inclusive=*/false).ToSnapshot(2000L);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 1);
+    EXPECT_EQ(tasks[0]->data_file()->file_path, "/path/to/file_b.parquet");
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithNonExistingRef) {
+  auto metadata = MakeTableMetadata({}, -1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->FromSnapshot("nonExistingRef", /*inclusive=*/false);
+  EXPECT_THAT(builder->Build(),
+              ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                               HasErrorMessage("Cannot find ref: 
nonExistingRef")));
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithTag) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b = MakeAppendSnapshot(
+      version, 2000L, 1000L, 2L, {"/path/to/file_b.parquet", 
"/path/to/file_c.parquet"});
+  auto snapshot_current = MakeAppendSnapshot(
+      version, 3000L, 2000L, 3L, {"/path/to/file_d.parquet", 
"/path/to/file_a2.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_current}, 3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})},
+       {"t1", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 1000L, .retention = 
SnapshotRef::Tag{}})},
+       {"t2", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 2000L, .retention = 
SnapshotRef::Tag{}})}});
+
+  // Test: from_snapshot_exclusive(t1) should return 4 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", /*inclusive=*/false);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 4);
+  }
+
+  // Test: from_snapshot_exclusive(t1).to_snapshot(t2) should return 2 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot("t1", /*inclusive=*/false).ToSnapshot("t2");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, FromSnapshotExclusiveWithBranchShouldFail) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a}, 1000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 1000L, .retention = 
SnapshotRef::Branch{}})},
+       {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+                  .snapshot_id = 1000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->FromSnapshot("b1", /*inclusive=*/false);
+  EXPECT_THAT(builder->Build(), 
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                                                 HasErrorMessage("Ref b1 is 
not a tag")));
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshot) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b =
+      MakeAppendSnapshot(version, 2000L, 1000L, 2L, 
{"/path/to/file_b.parquet"});
+  auto snapshot_c =
+      MakeAppendSnapshot(version, 3000L, 2000L, 3L, 
{"/path/to/file_c.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_c}, 3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: to_snapshot(snapshot_b) should return 2 files (A, B)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot(2000L);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+    EXPECT_THAT(GetPaths(tasks),
+                testing::UnorderedElementsAre("/path/to/file_a.parquet",
+                                              "/path/to/file_b.parquet"));
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshotWithTag) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b =
+      MakeAppendSnapshot(version, 2000L, 1000L, 2L, 
{"/path/to/file_b.parquet"});
+  auto snapshot_current =
+      MakeAppendSnapshot(version, 3000L, 2000L, 3L, 
{"/path/to/file_b2.parquet"});
+  auto snapshot_branch_b =
+      MakeAppendSnapshot(version, 4000L, 2000L, 3L, 
{"/path/to/file_c.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_current, snapshot_branch_b}, 3000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 3000L, .retention = 
SnapshotRef::Branch{}})},
+       {"b1", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 4000L, .retention = 
SnapshotRef::Branch{}})},
+       {"t1", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 2000L, .retention = 
SnapshotRef::Tag{}})},
+       {"t2", std::make_shared<SnapshotRef>(
+                  SnapshotRef{.snapshot_id = 4000L, .retention = 
SnapshotRef::Tag{}})}});
+
+  // Test: to_snapshot(t1) should return 2 files
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot("t1");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+  }
+
+  // Test: to_snapshot(t2) should return 3 files (on branch b1)
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot("t2");
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 3);
+  }
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshotWithNonExistingRef) {
+  auto metadata = MakeTableMetadata({}, -1L);
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->ToSnapshot("non_existing_ref");
+  EXPECT_THAT(builder->Build(),
+              ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                               HasErrorMessage("Cannot find ref: 
non_existing_ref")));
+}
+
+TEST_P(IncrementalAppendScanTest, ToSnapshotWithBranchShouldFail) {
+  auto version = GetParam();
+
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  auto snapshot_b =
+      MakeAppendSnapshot(version, 2000L, 1000L, 2L, 
{"/path/to/file_b.parquet"});
+
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b}, 2000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 2000L, .retention = 
SnapshotRef::Branch{}})},
+       {"b1", std::make_shared<SnapshotRef>(SnapshotRef{
+                  .snapshot_id = 2000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                         IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+  builder->ToSnapshot("b1");
+  EXPECT_THAT(builder->Build(), 
::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                                                 HasErrorMessage("Ref b1 is 
not a tag")));
+}
+
+TEST_P(IncrementalAppendScanTest, MultipleRootSnapshots) {
+  auto version = GetParam();
+
+  // Snapshot A (will be "expired" by not having it as parent of C)
+  auto snapshot_a =
+      MakeAppendSnapshot(version, 1000L, std::nullopt, 1L, 
{"/path/to/file_a.parquet"});
+  // Snapshot B (staged, orphaned - not an ancestor of main branch)
+  auto snapshot_b =
+      MakeAppendSnapshot(version, 2000L, 1000L, 2L, 
{"/path/to/file_b.parquet"});
+  // Snapshot C (new root after A is expired)
+  auto snapshot_c =
+      MakeAppendSnapshot(version, 3000L, std::nullopt, 3L, 
{"/path/to/file_c.parquet"});
+  // Snapshot D
+  auto snapshot_d =
+      MakeAppendSnapshot(version, 4000L, 3000L, 4L, 
{"/path/to/file_d.parquet"});
+
+  // Note: snapshot_a is kept in metadata but not in the parent chain of C/D
+  // This simulates expiring snapshot A, creating two root snapshots (B and C)
+  auto metadata = MakeTableMetadata(
+      {snapshot_a, snapshot_b, snapshot_c, snapshot_d}, 4000L,
+      {{"main", std::make_shared<SnapshotRef>(SnapshotRef{
+                    .snapshot_id = 4000L, .retention = 
SnapshotRef::Branch{}})}});
+
+  // Test: to_snapshot(snapshot_d) should discover snapshots C and D only
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->ToSnapshot(4000L);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    ICEBERG_UNWRAP_OR_FAIL(auto tasks, scan->PlanFiles());
+    ASSERT_EQ(tasks.size(), 2);
+    EXPECT_THAT(GetPaths(tasks),
+                testing::UnorderedElementsAre("/path/to/file_c.parquet",
+                                              "/path/to/file_d.parquet"));
+  }
+
+  // Test: from_snapshot_exclusive(snapshot_b).to_snapshot(snapshot_d) should 
fail
+  // because B is not a parent ancestor of D
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(2000L, /*inclusive=*/false).ToSnapshot(4000L);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    EXPECT_THAT(
+        scan->PlanFiles(),
+        ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                         HasErrorMessage("Starting snapshot (exclusive) 2000 
is not a "
+                                         "parent ancestor of end snapshot 
4000")));
+  }
+
+  // Test: from_snapshot_inclusive(snapshot_b).to_snapshot(snapshot_d) should 
fail
+  // because B is not an ancestor of D
+  {
+    ICEBERG_UNWRAP_OR_FAIL(auto builder,
+                           IncrementalAppendScanBuilder::Make(metadata, 
file_io_));
+    builder->FromSnapshot(2000L, /*inclusive=*/true).ToSnapshot(4000L);
+    ICEBERG_UNWRAP_OR_FAIL(auto scan, builder->Build());
+    EXPECT_THAT(
+        scan->PlanFiles(),
+        ::testing::AllOf(IsError(ErrorKind::kValidationFailed),
+                         HasErrorMessage("Starting snapshot (inclusive) 2000 
is not an "
+                                         "ancestor of end snapshot 4000")));
+  }
+}
+
+INSTANTIATE_TEST_SUITE_P(IncrementalAppendScanVersions, 
IncrementalAppendScanTest,
+                         testing::Values(1, 2, 3));
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/scan_test_base.h 
b/src/iceberg/test/scan_test_base.h
new file mode 100644
index 00000000..65a4e053
--- /dev/null
+++ b/src/iceberg/test/scan_test_base.h
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#pragma once
+
+#include <chrono>
+#include <format>
+#include <memory>
+#include <optional>
+#include <ranges>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "iceberg/arrow/arrow_file_io.h"
+#include "iceberg/avro/avro_register.h"
+#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/manifest/manifest_list.h"
+#include "iceberg/manifest/manifest_writer.h"
+#include "iceberg/partition_spec.h"
+#include "iceberg/schema.h"
+#include "iceberg/snapshot.h"
+#include "iceberg/table_metadata.h"
+#include "iceberg/table_scan.h"
+#include "iceberg/test/matchers.h"
+#include "iceberg/transform.h"
+#include "iceberg/type.h"
+
+namespace iceberg {
+
+/// \brief Base class for scan-related tests providing common test utilities.
+///
+/// This class provides common setup and helper functions for testing
+/// TableScan and IncrementalScan implementations.
+class ScanTestBase : public testing::TestWithParam<int8_t> {
+ protected:
+  void SetUp() override {
+    avro::RegisterAll();
+
+    file_io_ = arrow::MakeMockFileIO();
+    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
+        SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
+        SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
+    unpartitioned_spec_ = PartitionSpec::Unpartitioned();
+
+    ICEBERG_UNWRAP_OR_FAIL(
+        partitioned_spec_,
+        PartitionSpec::Make(
+            /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
+                                           "data_bucket_16_2", 
Transform::Bucket(16))}));
+  }
+
+  /// \brief Generate a unique manifest file path.
+  std::string MakeManifestPath() {
+    return std::format("manifest-{}-{}.avro", manifest_counter_++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  /// \brief Generate a unique manifest list file path.
+  std::string MakeManifestListPath() {
+    return std::format("manifest-list-{}-{}.avro", manifest_list_counter_++,
+                       
std::chrono::system_clock::now().time_since_epoch().count());
+  }
+
+  /// \brief Create a manifest entry.
+  ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
+                          int64_t sequence_number, std::shared_ptr<DataFile> 
file) {
+    return ManifestEntry{
+        .status = status,
+        .snapshot_id = snapshot_id,
+        .sequence_number = sequence_number,
+        .file_sequence_number = sequence_number,
+        .data_file = std::move(file),
+    };
+  }
+
+  /// \brief Write a data manifest file.
+  ManifestFile WriteDataManifest(
+      int8_t format_version, int64_t snapshot_id, std::vector<ManifestEntry> 
entries,
+      std::shared_ptr<PartitionSpec> spec = PartitionSpec::Unpartitioned()) {
+    const std::string manifest_path = MakeManifestPath();
+    auto writer_result = ManifestWriter::MakeWriter(
+        format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
+        ManifestContent::kData,
+        /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : 
std::nullopt);
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  /// \brief Write a delete manifest file.
+  ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id,
+                                   std::vector<ManifestEntry> entries,
+                                   std::shared_ptr<PartitionSpec> spec) {
+    const std::string manifest_path = MakeManifestPath();
+    auto writer_result =
+        ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, 
file_io_,
+                                   spec, schema_, ManifestContent::kDeletes);
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+
+    for (const auto& entry : entries) {
+      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
+    }
+
+    EXPECT_THAT(writer->Close(), IsOk());
+    auto manifest_result = writer->ToManifestFile();
+    EXPECT_THAT(manifest_result, IsOk());
+    return std::move(manifest_result.value());
+  }
+
+  /// \brief Write a manifest list file.
+  std::string WriteManifestList(int8_t format_version, int64_t snapshot_id,
+                                int64_t parent_snapshot_id, int64_t 
sequence_number,
+                                const std::vector<ManifestFile>& manifests) {
+    const std::string manifest_list_path = MakeManifestListPath();
+
+    auto writer_result = ManifestListWriter::MakeWriter(
+        format_version, snapshot_id, parent_snapshot_id, manifest_list_path, 
file_io_,
+        /*sequence_number=*/format_version >= 2 ? 
std::optional(sequence_number)
+                                                : std::nullopt,
+        /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : 
std::nullopt);
+
+    EXPECT_THAT(writer_result, IsOk());
+    auto writer = std::move(writer_result.value());
+    EXPECT_THAT(writer->AddAll(manifests), IsOk());
+    EXPECT_THAT(writer->Close(), IsOk());
+
+    return manifest_list_path;
+  }
+
+  /// \brief Extract file paths from scan tasks.
+  static std::vector<std::string> GetPaths(
+      const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
+    return tasks | std::views::transform([](const auto& task) {
+             return task->data_file()->file_path;
+           }) |
+           std::ranges::to<std::vector<std::string>>();
+  }
+
+  /// \brief Create table metadata with the given snapshots.
+  std::shared_ptr<TableMetadata> MakeTableMetadata(
+      const std::vector<std::shared_ptr<Snapshot>>& snapshots,
+      int64_t current_snapshot_id,
+      const std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>& 
refs = {},
+      std::shared_ptr<PartitionSpec> default_spec = nullptr) {
+    TimePointMs timestamp_ms = TimePointMsFromUnixMs(1609459200000L);
+    int64_t last_seq = snapshots.empty() ? 0L : 
snapshots.back()->sequence_number;
+    auto effective_spec = default_spec ? default_spec : unpartitioned_spec_;
+
+    return std::make_shared<TableMetadata>(TableMetadata{
+        .format_version = GetParam(),
+        .table_uuid = "test-table-uuid",
+        .location = "/tmp/table",
+        .last_sequence_number = last_seq,
+        .last_updated_ms = timestamp_ms,
+        .last_column_id = 2,
+        .schemas = {schema_},
+        .current_schema_id = schema_->schema_id(),
+        .partition_specs = {partitioned_spec_, unpartitioned_spec_},
+        .default_spec_id = effective_spec->spec_id(),
+        .last_partition_id = 1000,
+        .current_snapshot_id = current_snapshot_id,
+        .snapshots = snapshots,
+        .snapshot_log = {},
+        .default_sort_order_id = 0,
+        .refs = refs,
+    });
+  }
+
+  /// \brief Create a data file with optional partition values.
+  std::shared_ptr<DataFile> MakeDataFile(
+      const std::string& path,
+      PartitionValues partition = PartitionValues(std::vector<Literal>{}),
+      std::shared_ptr<PartitionSpec> spec = nullptr, int64_t record_count = 1) 
{
+    auto effective_spec = spec ? spec : unpartitioned_spec_;
+    return std::make_shared<DataFile>(DataFile{
+        .file_path = path,
+        .file_format = FileFormatType::kParquet,
+        .partition = std::move(partition),
+        .record_count = record_count,
+        .file_size_in_bytes = 10,
+        .sort_order_id = 0,
+        .partition_spec_id = effective_spec->spec_id(),
+    });
+  }
+
+  /// \brief Create an append snapshot with the given files (string paths).
+  std::shared_ptr<Snapshot> MakeAppendSnapshot(
+      int8_t format_version, int64_t snapshot_id,
+      std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+      const std::vector<std::string>& added_files,
+      std::shared_ptr<PartitionSpec> spec = nullptr) {
+    std::vector<std::pair<std::string, PartitionValues>> files_with_partitions;
+    for (const auto& path : added_files) {
+      files_with_partitions.emplace_back(path, kEmptyPartition);
+    }
+    return MakeAppendSnapshotWithPartitionValues(format_version, snapshot_id,
+                                                 parent_snapshot_id, 
sequence_number,
+                                                 files_with_partitions, spec);
+  }
+
+  /// \brief Create an append snapshot with the given files (with partition 
values).
+  std::shared_ptr<Snapshot> MakeAppendSnapshotWithPartitionValues(
+      int8_t format_version, int64_t snapshot_id,
+      std::optional<int64_t> parent_snapshot_id, int64_t sequence_number,
+      const std::vector<std::pair<std::string, PartitionValues>>& added_files,
+      std::shared_ptr<PartitionSpec> spec = nullptr) {
+    auto effective_spec = spec ? spec : unpartitioned_spec_;
+    std::vector<ManifestEntry> entries;
+    entries.reserve(added_files.size());
+    for (const auto& [path, partition] : added_files) {
+      auto file = MakeDataFile(path, partition, effective_spec);
+      entries.push_back(
+          MakeEntry(ManifestStatus::kAdded, snapshot_id, sequence_number, 
file));
+    }
+
+    auto manifest = WriteDataManifest(format_version, snapshot_id, 
std::move(entries),
+                                      effective_spec);
+    int64_t parent_id = parent_snapshot_id.value_or(0L);
+    auto manifest_list = WriteManifestList(format_version, snapshot_id, 
parent_id,
+                                           sequence_number, {manifest});
+    TimePointMs timestamp_ms =
+        TimePointMsFromUnixMs(1609459200000L + sequence_number * 1000);
+    return std::make_shared<Snapshot>(Snapshot{
+        .snapshot_id = snapshot_id,
+        .parent_snapshot_id = parent_snapshot_id,
+        .sequence_number = sequence_number,
+        .timestamp_ms = timestamp_ms,
+        .manifest_list = manifest_list,
+        .summary = {{"operation", "append"}},
+        .schema_id = schema_->schema_id(),
+    });
+  }
+
+  std::shared_ptr<FileIO> file_io_;
+  std::shared_ptr<Schema> schema_;
+  std::shared_ptr<PartitionSpec> partitioned_spec_;
+  std::shared_ptr<PartitionSpec> unpartitioned_spec_;
+
+ private:
+  int manifest_counter_ = 0;
+  int manifest_list_counter_ = 0;
+  constexpr static PartitionValues kEmptyPartition{};
+};
+
+}  // namespace iceberg
diff --git a/src/iceberg/test/table_scan_test.cc 
b/src/iceberg/test/table_scan_test.cc
index d9fc1e92..e4a3d21f 100644
--- a/src/iceberg/test/table_scan_test.cc
+++ b/src/iceberg/test/table_scan_test.cc
@@ -17,53 +17,26 @@
  * under the License.
  */
 
-#include "iceberg/table_scan.h"
-
-#include <chrono>
-#include <format>
 #include <memory>
 #include <optional>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "iceberg/arrow/arrow_file_io.h"
-#include "iceberg/avro/avro_register.h"
 #include "iceberg/expression/expressions.h"
-#include "iceberg/manifest/manifest_entry.h"
-#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/manifest/manifest_reader.h"
-#include "iceberg/manifest/manifest_writer.h"
-#include "iceberg/partition_spec.h"
-#include "iceberg/schema.h"
 #include "iceberg/snapshot.h"
 #include "iceberg/table_metadata.h"
-#include "iceberg/test/matchers.h"
-#include "iceberg/transform.h"
-#include "iceberg/type.h"
-#include "iceberg/util/timepoint.h"
+#include "iceberg/test/scan_test_base.h"
 
 namespace iceberg {
 
-class TableScanTest : public testing::TestWithParam<int8_t> {
+class TableScanTest : public ScanTestBase {
  protected:
   void SetUp() override {
-    avro::RegisterAll();
-
-    file_io_ = arrow::MakeMockFileIO();
-    schema_ = std::make_shared<Schema>(std::vector<SchemaField>{
-        SchemaField::MakeRequired(/*field_id=*/1, "id", int32()),
-        SchemaField::MakeRequired(/*field_id=*/2, "data", string())});
-    unpartitioned_spec_ = PartitionSpec::Unpartitioned();
-
-    ICEBERG_UNWRAP_OR_FAIL(
-        partitioned_spec_,
-        PartitionSpec::Make(
-            /*spec_id=*/1, {PartitionField(/*source_id=*/2, /*field_id=*/1000,
-                                           "data_bucket_16_2", 
Transform::Bucket(16))}));
-
+    ScanTestBase::SetUp();
     MakeTableMetadata();
   }
 
@@ -134,12 +107,6 @@ class TableScanTest : public 
testing::TestWithParam<int8_t> {
     });
   }
 
-  std::string MakeManifestPath() {
-    static int counter = 0;
-    return std::format("manifest-{}-{}.avro", counter++,
-                       
std::chrono::system_clock::now().time_since_epoch().count());
-  }
-
   std::shared_ptr<DataFile> MakeDataFile(const std::string& path,
                                          const PartitionValues& partition,
                                          int32_t spec_id, int64_t record_count 
= 1,
@@ -164,84 +131,12 @@ class TableScanTest : public 
testing::TestWithParam<int8_t> {
     return file;
   }
 
-  ManifestEntry MakeEntry(ManifestStatus status, int64_t snapshot_id,
-                          int64_t sequence_number, std::shared_ptr<DataFile> 
file) {
-    return ManifestEntry{
-        .status = status,
-        .snapshot_id = snapshot_id,
-        .sequence_number = sequence_number,
-        .file_sequence_number = sequence_number,
-        .data_file = std::move(file),
-    };
-  }
-
-  ManifestFile WriteDataManifest(int8_t format_version, int64_t snapshot_id,
-                                 std::vector<ManifestEntry> entries,
-                                 std::shared_ptr<PartitionSpec> spec) {
-    const std::string manifest_path = MakeManifestPath();
-    auto writer_result = ManifestWriter::MakeWriter(
-        format_version, snapshot_id, manifest_path, file_io_, spec, schema_,
-        ManifestContent::kData,
-        /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : 
std::nullopt);
-
-    EXPECT_THAT(writer_result, IsOk());
-    auto writer = std::move(writer_result.value());
-
-    for (const auto& entry : entries) {
-      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
-    }
-
-    EXPECT_THAT(writer->Close(), IsOk());
-    auto manifest_result = writer->ToManifestFile();
-    EXPECT_THAT(manifest_result, IsOk());
-    return std::move(manifest_result.value());
-  }
-
-  ManifestFile WriteDeleteManifest(int8_t format_version, int64_t snapshot_id,
-                                   std::vector<ManifestEntry> entries,
-                                   std::shared_ptr<PartitionSpec> spec) {
-    const std::string manifest_path = MakeManifestPath();
-    auto writer_result =
-        ManifestWriter::MakeWriter(format_version, snapshot_id, manifest_path, 
file_io_,
-                                   spec, schema_, ManifestContent::kDeletes);
-
-    EXPECT_THAT(writer_result, IsOk());
-    auto writer = std::move(writer_result.value());
-
-    for (const auto& entry : entries) {
-      EXPECT_THAT(writer->WriteEntry(entry), IsOk());
-    }
-
-    EXPECT_THAT(writer->Close(), IsOk());
-    auto manifest_result = writer->ToManifestFile();
-    EXPECT_THAT(manifest_result, IsOk());
-    return std::move(manifest_result.value());
-  }
-
-  std::string MakeManifestListPath() {
-    static int counter = 0;
-    return std::format("manifest-list-{}-{}.avro", counter++,
-                       
std::chrono::system_clock::now().time_since_epoch().count());
-  }
-
   std::string WriteManifestList(int8_t format_version, int64_t snapshot_id,
                                 int64_t sequence_number,
                                 const std::vector<ManifestFile>& manifests) {
-    const std::string manifest_list_path = MakeManifestListPath();
-    constexpr int64_t kParentSnapshotId = 0L;
-
-    auto writer_result = ManifestListWriter::MakeWriter(
-        format_version, snapshot_id, kParentSnapshotId, manifest_list_path, 
file_io_,
-        /*sequence_number=*/format_version >= 2 ? 
std::optional(sequence_number)
-                                                : std::nullopt,
-        /*first_row_id=*/format_version >= 3 ? std::optional<int64_t>(0L) : 
std::nullopt);
-
-    EXPECT_THAT(writer_result, IsOk());
-    auto writer = std::move(writer_result.value());
-    EXPECT_THAT(writer->AddAll(manifests), IsOk());
-    EXPECT_THAT(writer->Close(), IsOk());
-
-    return manifest_list_path;
+    return ScanTestBase::WriteManifestList(format_version, snapshot_id,
+                                           0L /*parent_snapshot_id*/, 
sequence_number,
+                                           manifests);
   }
 
   std::unordered_map<int32_t, std::shared_ptr<PartitionSpec>> GetSpecsById() {
@@ -249,18 +144,6 @@ class TableScanTest : public 
testing::TestWithParam<int8_t> {
             {unpartitioned_spec_->spec_id(), unpartitioned_spec_}};
   }
 
-  static std::vector<std::string> GetPaths(
-      const std::vector<std::shared_ptr<FileScanTask>>& tasks) {
-    return tasks | std::views::transform([](const auto& task) {
-             return task->data_file()->file_path;
-           }) |
-           std::ranges::to<std::vector<std::string>>();
-  }
-
-  std::shared_ptr<FileIO> file_io_;
-  std::shared_ptr<Schema> schema_;
-  std::shared_ptr<PartitionSpec> partitioned_spec_;
-  std::shared_ptr<PartitionSpec> unpartitioned_spec_;
   std::shared_ptr<TableMetadata> table_metadata_;
 };
 

Reply via email to