This is an automated email from the ASF dual-hosted git repository.

gangwu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new b7b5dd11 feat: extend table scan to support v2 deletes (#489)
b7b5dd11 is described below

commit b7b5dd11f0e6fbfe3b7b49c35f65b2651ac9fa5b
Author: Gang Wu <[email protected]>
AuthorDate: Tue Jan 6 18:03:04 2026 +0800

    feat: extend table scan to support v2 deletes (#489)
    
    Co-authored-by: Guotao Yu <[email protected]>
    Co-authored-by: Zehua Zou <[email protected]>
---
 example/demo_example.cc                   |   1 +
 src/iceberg/table.cc                      |   2 +-
 src/iceberg/table_scan.cc                 | 380 +++++++++++++++++++++---------
 src/iceberg/table_scan.h                  | 325 ++++++++++++++++---------
 src/iceberg/test/file_scan_task_test.cc   |   6 +-
 src/iceberg/util/snapshot_util.cc         |  31 ++-
 src/iceberg/util/snapshot_util_internal.h |   8 +
 7 files changed, 522 insertions(+), 231 deletions(-)

diff --git a/example/demo_example.cc b/example/demo_example.cc
index ab011fee..6869aa37 100644
--- a/example/demo_example.cc
+++ b/example/demo_example.cc
@@ -22,6 +22,7 @@
 #include "iceberg/arrow/arrow_file_io.h"
 #include "iceberg/avro/avro_register.h"
 #include "iceberg/catalog/memory/in_memory_catalog.h"
+#include "iceberg/manifest/manifest_entry.h"
 #include "iceberg/parquet/parquet_register.h"
 #include "iceberg/table.h"
 #include "iceberg/table_scan.h"
diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc
index ee3ce594..f2e6d320 100644
--- a/src/iceberg/table.cc
+++ b/src/iceberg/table.cc
@@ -141,7 +141,7 @@ const std::shared_ptr<TableMetadata>& Table::metadata() 
const { return metadata_
 const std::shared_ptr<Catalog>& Table::catalog() const { return catalog_; }
 
 Result<std::unique_ptr<TableScanBuilder>> Table::NewScan() const {
-  return std::make_unique<TableScanBuilder>(metadata_, io_);
+  return TableScanBuilder::Make(metadata_, io_);
 }
 
 Result<std::shared_ptr<Transaction>> Table::NewTransaction() {
diff --git a/src/iceberg/table_scan.cc b/src/iceberg/table_scan.cc
index 700cab1f..12410270 100644
--- a/src/iceberg/table_scan.cc
+++ b/src/iceberg/table_scan.cc
@@ -20,22 +20,40 @@
 #include "iceberg/table_scan.h"
 
 #include <cstring>
-#include <vector>
 
-#include "iceberg/arrow_c_data.h"
+#include "iceberg/expression/expression.h"
 #include "iceberg/file_reader.h"
 #include "iceberg/manifest/manifest_entry.h"
-#include "iceberg/manifest/manifest_list.h"
-#include "iceberg/manifest/manifest_reader.h"
+#include "iceberg/manifest/manifest_group.h"
+#include "iceberg/result.h"
 #include "iceberg/schema.h"
-#include "iceberg/schema_field.h"
 #include "iceberg/snapshot.h"
 #include "iceberg/table_metadata.h"
 #include "iceberg/util/macros.h"
+#include "iceberg/util/snapshot_util_internal.h"
+#include "iceberg/util/timepoint.h"
 
 namespace iceberg {
 
 namespace {
+
+const std::vector<std::string> kScanColumns = {
+    "snapshot_id",         "file_path",          "file_ordinal",  
"file_format",
+    "block_size_in_bytes", "file_size_in_bytes", "record_count",  "partition",
+    "key_metadata",        "split_offsets",      "sort_order_id",
+};
+
+const std::vector<std::string> kStatsColumns = {
+    "value_counts", "null_value_counts", "nan_value_counts",
+    "lower_bounds", "upper_bounds",      "column_sizes",
+};
+
+const std::vector<std::string> kScanColumnsWithStats = [] {
+  auto cols = kScanColumns;
+  cols.insert(cols.end(), kStatsColumns.begin(), kStatsColumns.end());
+  return cols;
+}();
+
 /// \brief Private data structure to hold the Reader and error state
 struct ReaderStreamPrivateData {
   std::unique_ptr<Reader> reader;
@@ -135,6 +153,25 @@ Result<ArrowArrayStream> 
MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
 
 }  // namespace
 
+namespace internal {
+
+Status TableScanContext::Validate() const {
+  ICEBERG_CHECK(columns_to_keep_stats.empty() || return_column_stats,
+                "Cannot select columns to keep stats when column stats are not 
returned");
+  ICEBERG_CHECK(projected_schema == nullptr || selected_columns.empty(),
+                "Cannot set projection schema and selected columns at the same 
time");
+  ICEBERG_CHECK(!snapshot_id.has_value() ||
+                    (!from_snapshot_id.has_value() && 
!to_snapshot_id.has_value()),
+                "Cannot mix snapshot scan and incremental scan");
+  ICEBERG_CHECK(!min_rows_requested.has_value() || min_rows_requested.value() 
>= 0,
+                "Min rows requested cannot be negative");
+  return {};
+}
+
+}  // namespace internal
+
+ScanTask::~ScanTask() = default;
+
 // FileScanTask implementation
 
 FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file,
@@ -142,22 +179,10 @@ FileScanTask::FileScanTask(std::shared_ptr<DataFile> 
data_file,
                            std::shared_ptr<Expression> residual_filter)
     : data_file_(std::move(data_file)),
       delete_files_(std::move(delete_files)),
-      residual_filter_(std::move(residual_filter)) {}
-
-const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return 
data_file_; }
-
-const std::vector<std::shared_ptr<DataFile>>& FileScanTask::delete_files() 
const {
-  return delete_files_;
-}
-
-const std::shared_ptr<Expression>& FileScanTask::residual_filter() const {
-  return residual_filter_;
+      residual_filter_(std::move(residual_filter)) {
+  ICEBERG_DCHECK(data_file_ != nullptr, "Data file cannot be null for 
FileScanTask");
 }
 
-bool FileScanTask::has_deletes() const { return !delete_files_.empty(); }
-
-bool FileScanTask::has_residual_filter() const { return residual_filter_ != 
nullptr; }
-
 int64_t FileScanTask::size_bytes() const { return 
data_file_->file_size_in_bytes; }
 
 int32_t FileScanTask::files_count() const { return 1; }
@@ -165,17 +190,16 @@ int32_t FileScanTask::files_count() const { return 1; }
 int64_t FileScanTask::estimated_row_count() const { return 
data_file_->record_count; }
 
 Result<ArrowArrayStream> FileScanTask::ToArrow(
-    const std::shared_ptr<FileIO>& io, const std::shared_ptr<Schema>& 
projected_schema,
-    const std::shared_ptr<Expression>& filter) const {
-  if (has_deletes()) {
+    const std::shared_ptr<FileIO>& io, std::shared_ptr<Schema> 
projected_schema) const {
+  if (!delete_files_.empty()) {
     return NotSupported("Reading data files with delete files is not yet 
supported.");
   }
 
   const ReaderOptions options{.path = data_file_->file_path,
                               .length = data_file_->file_size_in_bytes,
                               .io = io,
-                              .projection = projected_schema,
-                              .filter = filter};
+                              .projection = std::move(projected_schema),
+                              .filter = residual_filter_};
 
   ICEBERG_ASSIGN_OR_RAISE(auto reader,
                           ReaderFactoryRegistry::Open(data_file_->file_format, 
options));
@@ -183,138 +207,272 @@ Result<ArrowArrayStream> FileScanTask::ToArrow(
   return MakeArrowArrayStream(std::move(reader));
 }
 
+Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
+    std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
+  ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+  return std::unique_ptr<TableScanBuilder>(
+      new TableScanBuilder(std::move(metadata), std::move(io)));
+}
+
 TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata> 
table_metadata,
                                    std::shared_ptr<FileIO> file_io)
-    : file_io_(std::move(file_io)) {
-  context_.table_metadata = std::move(table_metadata);
-}
+    : metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}
 
-TableScanBuilder& TableScanBuilder::WithColumnNames(
-    std::vector<std::string> column_names) {
-  column_names_ = std::move(column_names);
+TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value) 
{
+  context_.options[std::move(key)] = std::move(value);
   return *this;
 }
 
-TableScanBuilder& 
TableScanBuilder::WithProjectedSchema(std::shared_ptr<Schema> schema) {
+TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
   context_.projected_schema = std::move(schema);
   return *this;
 }
 
-TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
-  snapshot_id_ = snapshot_id;
+TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
+  context_.case_sensitive = case_sensitive;
   return *this;
 }
 
-TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression> 
filter) {
+TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
+  context_.return_column_stats = true;
+  return *this;
+}
+
+TableScanBuilder& TableScanBuilder::IncludeColumnStats(
+    const std::vector<std::string>& requested_columns) {
+  context_.return_column_stats = true;
+  context_.columns_to_keep_stats.clear();
+  context_.columns_to_keep_stats.reserve(requested_columns.size());
+
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema());
+  const auto& schema = schema_ref.get();
+  for (const auto& column_name : requested_columns) {
+    ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field, 
schema->FindFieldByName(column_name));
+    if (field.has_value()) {
+      context_.columns_to_keep_stats.insert(field.value().get().field_id());
+    }
+  }
+
+  return *this;
+}
+
+TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>& 
column_names) {
+  context_.selected_columns = column_names;
+  return *this;
+}
+
+TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter) 
{
   context_.filter = std::move(filter);
   return *this;
 }
 
-TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) {
-  context_.case_sensitive = case_sensitive;
+TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
+  context_.ignore_residuals = true;
   return *this;
 }
 
-TableScanBuilder& TableScanBuilder::WithOption(std::string property, 
std::string value) {
-  context_.options[std::move(property)] = std::move(value);
+TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
+  context_.min_rows_requested = num_rows;
   return *this;
 }
 
-TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) {
-  context_.limit = limit;
+TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
+  ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
+                        "Cannot override snapshot, already set snapshot id={}",
+                        context_.snapshot_id.value());
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, 
metadata_->SnapshotById(snapshot_id));
+  context_.snapshot_id = snapshot_id;
   return *this;
 }
 
-Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
-  const auto& table_metadata = context_.table_metadata;
-  auto snapshot_id = snapshot_id_ ? snapshot_id_ : 
table_metadata->current_snapshot_id;
-  if (!snapshot_id) {
-    return InvalidArgument("No snapshot ID specified for table {}",
-                           table_metadata->table_uuid);
+TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
+  if (ref == SnapshotRef::kMainBranch) {
+    snapshot_schema_ = nullptr;
+    context_.snapshot_id.reset();
+    return *this;
   }
-  ICEBERG_ASSIGN_OR_RAISE(context_.snapshot, 
table_metadata->SnapshotById(*snapshot_id));
 
-  if (!context_.projected_schema) {
-    const auto& snapshot = context_.snapshot;
-    auto schema_id = table_metadata->current_schema_id;
-    ICEBERG_ASSIGN_OR_RAISE(auto schema, 
table_metadata->SchemaById(schema_id));
+  ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
+                        "Cannot override ref, already set snapshot id={}",
+                        context_.snapshot_id.value());
+  auto iter = metadata_->refs.find(ref);
+  ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref {}", 
ref);
+  ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+  int32_t snapshot_id = iter->second->snapshot_id;
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore, 
metadata_->SnapshotById(snapshot_id));
+  context_.snapshot_id = snapshot_id;
+
+  return *this;
+}
+
+TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto time_point_ms,
+                                   TimePointMsFromUnixMs(timestamp_millis));
+  ICEBERG_BUILDER_ASSIGN_OR_RETURN(
+      auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_, 
time_point_ms));
+  return UseSnapshot(snapshot_id);
+}
+
+TableScanBuilder& TableScanBuilder::FromSnapshot(
+    [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool 
inclusive) {
+  return AddError(NotImplemented("Incremental scan is not implemented"));
+}
 
-    if (column_names_.empty()) {
-      context_.projected_schema = schema;
+TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const 
std::string& ref,
+                                                 [[maybe_unused]] bool 
inclusive) {
+  return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t 
to_snapshot_id) {
+  return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const 
std::string& ref) {
+  return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
+  context_.branch = branch;
+  return *this;
+}
+
+Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
+TableScanBuilder::ResolveSnapshotSchema() {
+  if (snapshot_schema_ == nullptr) {
+    if (context_.snapshot_id.has_value()) {
+      ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
+                              metadata_->SnapshotById(*context_.snapshot_id));
+      int32_t schema_id = 
snapshot->schema_id.value_or(Schema::kInitialSchemaId);
+      ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, 
metadata_->SchemaById(schema_id));
     } else {
-      // TODO(gty404): collect touched columns from filter expression
-      std::vector<SchemaField> projected_fields;
-      projected_fields.reserve(column_names_.size());
-      for (const auto& column_name : column_names_) {
-        // TODO(gty404): support case-insensitive column names
-        auto field_opt = schema->GetFieldByName(column_name);
-        if (!field_opt) {
-          return InvalidArgument("Column {} not found in schema '{}'", 
column_name,
-                                 schema_id);
-        }
-        projected_fields.emplace_back(field_opt.value()->get());
-      }
-      context_.projected_schema =
-          std::make_shared<Schema>(std::move(projected_fields), 
schema->schema_id());
+      ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->Schema());
     }
-  } else if (!column_names_.empty()) {
-    return InvalidArgument(
-        "Cannot specify column names when a projected schema is provided");
   }
+  ICEBERG_CHECK(snapshot_schema_ != nullptr, "Snapshot schema is null");
+  return snapshot_schema_;
+}
+
+bool TableScanBuilder::IsIncrementalScan() const {
+  return context_.from_snapshot_id.has_value() || 
context_.to_snapshot_id.has_value();
+}
+
+Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
+  ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+  ICEBERG_RETURN_UNEXPECTED(context_.Validate());
+
+  if (IsIncrementalScan()) {
+    return NotImplemented("Incremental scan is not yet implemented");
+  }
+
+  ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
+  return DataTableScan::Make(metadata_, schema.get(), io_, 
std::move(context_));
+}
+
+TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
+                     std::shared_ptr<Schema> schema, std::shared_ptr<FileIO> 
file_io,
+                     internal::TableScanContext context)
+    : metadata_(std::move(metadata)),
+      schema_(std::move(schema)),
+      io_(std::move(file_io)),
+      context_(std::move(context)) {}
+
+TableScan::~TableScan() = default;
 
-  return std::make_unique<DataTableScan>(std::move(context_), file_io_);
+const std::shared_ptr<TableMetadata>& TableScan::metadata() const { return 
metadata_; }
+
+Result<std::shared_ptr<Snapshot>> TableScan::snapshot() const {
+  auto snapshot_id = context_.snapshot_id ? context_.snapshot_id.value()
+                                          : metadata_->current_snapshot_id;
+  return metadata_->SnapshotById(snapshot_id);
+}
+
+Result<std::shared_ptr<Schema>> TableScan::schema() const {
+  return ResolveProjectedSchema();
+}
+
+const internal::TableScanContext& TableScan::context() const { return 
context_; }
+
+const std::shared_ptr<FileIO>& TableScan::io() const { return io_; }
+
+const std::shared_ptr<Expression>& TableScan::filter() const {
+  const static std::shared_ptr<Expression> true_expr = True::Instance();
+  if (!context_.filter) {
+    return true_expr;
+  }
+  return context_.filter;
 }
 
-TableScan::TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
-    : context_(std::move(context)), file_io_(std::move(file_io)) {}
+bool TableScan::is_case_sensitive() const { return context_.case_sensitive; }
+
+Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
+TableScan::ResolveProjectedSchema() const {
+  if (projected_schema_ != nullptr) {
+    return projected_schema_;
+  }
 
-const std::shared_ptr<Snapshot>& TableScan::snapshot() const { return 
context_.snapshot; }
+  if (!context_.selected_columns.empty()) {
+    /// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field 
ids
+    /// from selected column names and bound references in the filter, and 
then create
+    /// projected schema based on the collected field ids.
+    return NotImplemented(
+        "Selecting columns by name to create projected schema is not yet 
implemented");
+  } else if (context_.projected_schema != nullptr) {
+    projected_schema_ = context_.projected_schema;
+  } else {
+    projected_schema_ = schema_;
+  }
 
-const std::shared_ptr<Schema>& TableScan::projection() const {
-  return context_.projected_schema;
+  return projected_schema_;
 }
 
-const TableScanContext& TableScan::context() const { return context_; }
+const std::vector<std::string>& TableScan::ScanColumns() const {
+  return context_.return_column_stats ? kScanColumnsWithStats : kScanColumns;
+}
 
-const std::shared_ptr<FileIO>& TableScan::io() const { return file_io_; }
+Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
+    std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+    std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+  ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+  ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+  ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+  return std::unique_ptr<DataTableScan>(new DataTableScan(
+      std::move(metadata), std::move(schema), std::move(io), 
std::move(context)));
+}
 
-DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr<FileIO> 
file_io)
-    : TableScan(std::move(context), std::move(file_io)) {}
+DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
+                             std::shared_ptr<Schema> schema, 
std::shared_ptr<FileIO> io,
+                             internal::TableScanContext context)
+    : TableScan(std::move(metadata), std::move(schema), std::move(io),
+                std::move(context)) {}
 
 Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles() 
const {
-  ICEBERG_ASSIGN_OR_RAISE(
-      auto manifest_list_reader,
-      ManifestListReader::Make(context_.snapshot->manifest_list, file_io_));
-  ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
-
-  std::vector<std::shared_ptr<FileScanTask>> tasks;
-  ICEBERG_ASSIGN_OR_RAISE(auto partition_spec, 
context_.table_metadata->PartitionSpec());
-
-  // Get the table schema
-  ICEBERG_ASSIGN_OR_RAISE(auto current_schema, 
context_.table_metadata->Schema());
-
-  for (const auto& manifest_file : manifest_files) {
-    ICEBERG_ASSIGN_OR_RAISE(
-        auto manifest_reader,
-        ManifestReader::Make(manifest_file, file_io_, current_schema, 
partition_spec));
-    ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
-
-    // TODO(gty404): filter manifests using partition spec and filter 
expression
-
-    for (auto& manifest_entry : manifests) {
-      const auto& data_file = manifest_entry.data_file;
-      switch (data_file->content) {
-        case DataFile::Content::kData:
-          
tasks.emplace_back(std::make_shared<FileScanTask>(manifest_entry.data_file));
-          break;
-        case DataFile::Content::kPositionDeletes:
-        case DataFile::Content::kEqualityDeletes:
-          return NotSupported("Equality/Position deletes are not supported in 
data scan");
-      }
-    }
+  ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
+  if (!snapshot) {
+    return std::vector<std::shared_ptr<FileScanTask>>{};
   }
 
-  return tasks;
+  TableMetadataCache metadata_cache(metadata_.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id, 
metadata_cache.GetPartitionSpecsById());
+
+  SnapshotCache snapshot_cache(snapshot.get());
+  ICEBERG_ASSIGN_OR_RAISE(auto data_manifests, 
snapshot_cache.DataManifests(io_));
+  ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests, 
snapshot_cache.DeleteManifests(io_));
+
+  ICEBERG_ASSIGN_OR_RAISE(
+      auto manifest_group,
+      ManifestGroup::Make(io_, schema_, specs_by_id,
+                          {data_manifests.begin(), data_manifests.end()},
+                          {delete_manifests.begin(), delete_manifests.end()}));
+  manifest_group->CaseSensitive(context_.case_sensitive)
+      .Select(ScanColumns())
+      .FilterData(filter())
+      .IgnoreDeleted()
+      .ColumnsToKeepStats(context_.columns_to_keep_stats);
+  if (context_.ignore_residuals) {
+    manifest_group->IgnoreResiduals();
+  }
+  return manifest_group->PlanFiles();
 }
 
 }  // namespace iceberg
diff --git a/src/iceberg/table_scan.h b/src/iceberg/table_scan.h
index 4f2ddfde..cc26830f 100644
--- a/src/iceberg/table_scan.h
+++ b/src/iceberg/table_scan.h
@@ -19,19 +19,30 @@
 
 #pragma once
 
+#include <functional>
+#include <memory>
+#include <optional>
 #include <string>
+#include <unordered_map>
+#include <unordered_set>
 #include <vector>
 
 #include "iceberg/arrow_c_data.h"
-#include "iceberg/manifest/manifest_entry.h"
+#include "iceberg/result.h"
 #include "iceberg/type_fwd.h"
+#include "iceberg/util/error_collector.h"
 
 namespace iceberg {
 
 /// \brief An abstract scan task.
 class ICEBERG_EXPORT ScanTask {
  public:
-  virtual ~ScanTask() = default;
+  enum class Kind : uint8_t {
+    kFileScanTask,
+  };
+
+  /// \brief The kind of scan task.
+  virtual Kind kind() const = 0;
 
   /// \brief The number of bytes that should be read by this scan task.
   virtual int64_t size_bytes() const = 0;
@@ -41,6 +52,8 @@ class ICEBERG_EXPORT ScanTask {
 
   /// \brief The number of rows that should be read by this scan task.
   virtual int64_t estimated_row_count() const = 0;
+
+  virtual ~ScanTask();
 };
 
 /// \brief Task representing a data file and its corresponding delete files.
@@ -50,175 +63,271 @@ class ICEBERG_EXPORT FileScanTask : public ScanTask {
   ///
   /// \param data_file The data file to read.
   /// \param delete_files Delete files that apply to this data file.
-  /// \param residual_filter Optional residual filter to apply after reading.
+  /// \param filter Optional residual filter to apply after reading.
   explicit FileScanTask(std::shared_ptr<DataFile> data_file,
                         std::vector<std::shared_ptr<DataFile>> delete_files = 
{},
-                        std::shared_ptr<Expression> residual_filter = nullptr);
+                        std::shared_ptr<Expression> filter = nullptr);
 
   /// \brief The data file that should be read by this scan task.
-  const std::shared_ptr<DataFile>& data_file() const;
+  const std::shared_ptr<DataFile>& data_file() const { return data_file_; }
 
   /// \brief Delete files that apply to this data file.
-  const std::vector<std::shared_ptr<DataFile>>& delete_files() const;
+  const std::vector<std::shared_ptr<DataFile>>& delete_files() const {
+    return delete_files_;
+  }
 
   /// \brief Residual filter to apply after reading.
-  const std::shared_ptr<Expression>& residual_filter() const;
-
-  /// \brief Check if any deletes need to be applied.
-  bool has_deletes() const;
-
-  /// \brief Check if a residual filter needs to be applied.
-  bool has_residual_filter() const;
+  const std::shared_ptr<Expression>& residual_filter() const { return 
residual_filter_; }
 
+  Kind kind() const override { return Kind::kFileScanTask; }
   int64_t size_bytes() const override;
   int32_t files_count() const override;
   int64_t estimated_row_count() const override;
 
-  /**
-   * \brief Returns a C-ABI compatible ArrowArrayStream to read the data for 
this task.
-   *
-   * \param io The FileIO instance for accessing the file data.
-   * \param projected_schema The projected schema for reading the data.
-   * \param filter Optional filter expression to apply during reading.
-   * \return A Result containing an ArrowArrayStream, or an error on failure.
-   */
+  /// TODO(gangwu): move it to iceberg/data/task_scanner.h
+  ///
+  /// \brief Returns a C-ABI compatible ArrowArrayStream to read the data for 
this task.
+  ///
+  /// \param io The FileIO instance for accessing the file data.
+  /// \param projected_schema The projected schema for reading the data.
+  /// \return A Result containing an ArrowArrayStream, or an error on failure.
   Result<ArrowArrayStream> ToArrow(const std::shared_ptr<FileIO>& io,
-                                   const std::shared_ptr<Schema>& 
projected_schema,
-                                   const std::shared_ptr<Expression>& filter) 
const;
+                                   std::shared_ptr<Schema> projected_schema) 
const;
 
  private:
-  /// \brief Data file metadata.
   std::shared_ptr<DataFile> data_file_;
-  /// \brief Delete files that apply to this data file.
   std::vector<std::shared_ptr<DataFile>> delete_files_;
-  /// \brief Residual filter to apply after reading.
   std::shared_ptr<Expression> residual_filter_;
 };
 
-/// \brief Scan context holding snapshot and scan-specific metadata.
+namespace internal {
+
+// Internal table scan context used by different scan implementations.
 struct TableScanContext {
-  /// \brief Table metadata.
-  std::shared_ptr<TableMetadata> table_metadata;
-  /// \brief Snapshot to scan.
-  std::shared_ptr<Snapshot> snapshot;
-  /// \brief Projected schema.
-  std::shared_ptr<Schema> projected_schema;
-  /// \brief Filter expression to apply.
+  std::optional<int64_t> snapshot_id;
   std::shared_ptr<Expression> filter;
-  /// \brief Whether the scan is case-sensitive.
-  bool case_sensitive = false;
-  /// \brief Additional options for the scan.
+  bool ignore_residuals{false};
+  bool case_sensitive{true};
+  bool return_column_stats{false};
+  std::unordered_set<int32_t> columns_to_keep_stats;
+  std::vector<std::string> selected_columns;
+  std::shared_ptr<Schema> projected_schema;
   std::unordered_map<std::string, std::string> options;
-  /// \brief Optional limit on the number of rows to scan.
-  std::optional<int64_t> limit;
+  bool from_snapshot_id_inclusive{false};
+  std::optional<int64_t> from_snapshot_id;
+  std::optional<int64_t> to_snapshot_id;
+  std::string branch{};
+  std::optional<int64_t> min_rows_requested;
+
+  // Validate the context parameters to see if they have conflicts.
+  [[nodiscard]] Status Validate() const;
 };
 
+}  // namespace internal
+
 /// \brief Builder class for creating TableScan instances.
-class ICEBERG_EXPORT TableScanBuilder {
+class ICEBERG_EXPORT TableScanBuilder : public ErrorCollector {
  public:
   /// \brief Constructs a TableScanBuilder for the given table.
-  /// \param table_metadata The metadata of the table to scan.
-  /// \param file_io The FileIO instance for reading manifests and data files.
-  explicit TableScanBuilder(std::shared_ptr<TableMetadata> table_metadata,
-                            std::shared_ptr<FileIO> file_io);
-
-  /// \brief Sets the snapshot ID to scan.
-  /// \param snapshot_id The ID of the snapshot.
-  /// \return Reference to the builder.
-  TableScanBuilder& WithSnapshotId(int64_t snapshot_id);
-
-  /// \brief Selects columns to include in the scan.
-  /// \param column_names A list of column names. If empty, all columns will 
be selected.
-  /// \return Reference to the builder.
-  TableScanBuilder& WithColumnNames(std::vector<std::string> column_names);
-
-  /// \brief Sets the schema to use for the scan.
-  /// \param schema The schema to use.
-  /// \return Reference to the builder.
-  TableScanBuilder& WithProjectedSchema(std::shared_ptr<Schema> schema);
-
-  /// \brief Applies a filter expression to the scan.
-  /// \param filter Filter expression to use.
-  /// \return Reference to the builder.
-  TableScanBuilder& WithFilter(std::shared_ptr<Expression> filter);
-
-  /// \brief Sets whether the scan should be case-sensitive.
-  /// \param case_sensitive Whether the scan is case-sensitive.
-  /// /return Reference to the builder.
-  TableScanBuilder& WithCaseSensitive(bool case_sensitive);
-
-  /// \brief Sets an option for the scan.
-  /// \param property The name of the option.
-  /// \param value The value of the option.
-  /// \return Reference to the builder.
-  TableScanBuilder& WithOption(std::string property, std::string value);
-
-  /// \brief Sets an optional limit on the number of rows to scan.
-  /// \param limit Optional limit on the number of rows.
-  /// \return Reference to the builder.
-  TableScanBuilder& WithLimit(std::optional<int64_t> limit);
+  /// \param metadata Current table metadata.
+  /// \param io FileIO instance for reading manifests files.
+  static Result<std::unique_ptr<TableScanBuilder>> Make(
+      std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io);
+
+  /// \brief Update property that will override the table's behavior
+  /// based on the incoming pair. Unknown properties will be ignored.
+  /// \param key name of the table property to be overridden
+  /// \param value value to override with
+  TableScanBuilder& Option(std::string key, std::string value);
+
+  /// \brief Set the projected schema.
+  /// \param schema a projection schema
+  TableScanBuilder& Project(std::shared_ptr<Schema> schema);
+
+  /// \brief If data columns are selected via Select(), controls whether
+  /// the match to the schema will be done with case sensitivity. Default is 
true.
+  /// \param case_sensitive whether the scan is case-sensitive
+  TableScanBuilder& CaseSensitive(bool case_sensitive);
+
+  /// \brief Request this scan to load the column stats with each data file.
+  ///
+  /// Column stats include: value count, null value count, lower bounds, and 
upper bounds.
+  TableScanBuilder& IncludeColumnStats();
+
+  /// \brief Request this scan to load the column stats for the specific 
columns with each
+  /// data file.
+  ///
+  /// Column stats include: value count, null value count, lower bounds, and 
upper bounds.
+  ///
+  /// \param requested_columns column names for which to keep the stats.
+  TableScanBuilder& IncludeColumnStats(const std::vector<std::string>& 
requested_columns);
+
+  /// \brief Request this scan to read the given data columns.
+  ///
+  /// This produces an expected schema that includes all fields that are 
either selected
+  /// or used by this scan's filter expression.
+  ///
+  /// \param column_names column names from the table's schema
+  TableScanBuilder& Select(const std::vector<std::string>& column_names);
+
+  /// \brief Set the expression to filter data.
+  /// \param filter a filter expression
+  TableScanBuilder& Filter(std::shared_ptr<Expression> filter);
+
+  /// \brief Request data filtering to files but not to rows in those files.
+  TableScanBuilder& IgnoreResiduals();
+
+  /// \brief Request this scan to return at least the given number of rows.
+  ///
+  /// This is used as a hint and is entirely optional in order to not have to 
return more
+  /// rows than necessary. This may return fewer rows if the scan does not 
contain that
+  /// many, or it may return more than requested.
+  ///
+  /// \param num_rows The minimum number of rows requested
+  TableScanBuilder& MinRowsRequested(int64_t num_rows);
+
+  /// \brief Request this scan to use the given snapshot by ID.
+  /// \param snapshot_id a snapshot ID
+  /// \note InvalidArgument will be returned if the snapshot cannot be found
+  TableScanBuilder& UseSnapshot(int64_t snapshot_id);
+
+  /// \brief Request this scan to use the given reference.
+  /// \param ref reference
+  /// \note InvalidArgument will be returned if a reference with the given name
+  /// could not be found
+  TableScanBuilder& UseRef(const std::string& ref);
+
+  /// \brief Request this scan to use the most recent snapshot as of the given 
time
+  /// in milliseconds on the branch in the scan or main if no branch is set.
+  /// \param timestamp_millis a timestamp in milliseconds.
+  /// \note InvalidArgument will be returned if the snapshot cannot be found 
or time
+  /// travel is attempted on a tag
+  TableScanBuilder& AsOfTime(int64_t timestamp_millis);
+
+  /// \brief Instructs this scan to look for changes starting from a 
particular snapshot.
+  ///
+  /// If the start snapshot is not configured, it defaults to the oldest 
ancestor of the
+  /// end snapshot (inclusive).
+  ///
+  /// \param from_snapshot_id the start snapshot ID
+  /// \param inclusive whether the start snapshot is inclusive, default is 
false
+  /// \note InvalidArgument will be returned if the start snapshot is not an 
ancestor of
+  /// the end snapshot
+  TableScanBuilder& FromSnapshot(int64_t from_snapshot_id, bool inclusive = 
false);
+
+  /// \brief Instructs this scan to look for changes starting from a 
particular snapshot.
+  ///
+  /// If the start snapshot is not configured, it defaults to the oldest 
ancestor of the
+  /// end snapshot (inclusive).
+  ///
+  /// \param ref the start ref name that points to a particular snapshot ID
+  /// \param inclusive whether the start snapshot is inclusive, default is 
false
+  /// \note InvalidArgument will be returned if the start snapshot is not an 
ancestor of
+  /// the end snapshot
+  TableScanBuilder& FromSnapshot(const std::string& ref, bool inclusive = 
false);
+
+  /// \brief Instructs this scan to look for changes up to a particular 
snapshot
+  /// (inclusive).
+  ///
+  /// If the end snapshot is not configured, it defaults to the current table 
snapshot
+  /// (inclusive).
+  ///
+  /// \param to_snapshot_id the end snapshot ID (inclusive)
+  TableScanBuilder& ToSnapshot(int64_t to_snapshot_id);
+
+  /// \brief Instructs this scan to look for changes up to a particular 
snapshot ref
+  /// (inclusive).
+  ///
+  /// If the end snapshot is not configured, it defaults to the current table 
snapshot
+  /// (inclusive).
+  ///
+  /// \param ref the end snapshot Ref (inclusive)
+  TableScanBuilder& ToSnapshot(const std::string& ref);
+
+  /// \brief Use the specified branch
+  /// \param branch the branch name
+  TableScanBuilder& UseBranch(const std::string& branch);
 
   /// \brief Builds and returns a TableScan instance.
   /// \return A Result containing the TableScan or an error.
   Result<std::unique_ptr<TableScan>> Build();
 
  private:
-  /// \brief the file I/O instance for reading manifests and data files.
-  std::shared_ptr<FileIO> file_io_;
-  /// \brief column names to project in the scan.
-  std::vector<std::string> column_names_;
-  /// \brief snapshot ID to scan, if specified.
-  std::optional<int64_t> snapshot_id_;
-  /// \brief Context for the scan, including snapshot, schema, and filter.
-  TableScanContext context_;
+  TableScanBuilder(std::shared_ptr<TableMetadata> metadata, 
std::shared_ptr<FileIO> io);
+
+  // Return the schema bound to the specified snapshot.
+  Result<std::reference_wrapper<const std::shared_ptr<Schema>>> 
ResolveSnapshotSchema();
+
+  // Return whether current configuration indicates an incremental scan mode.
+  bool IsIncrementalScan() const;
+
+  std::shared_ptr<TableMetadata> metadata_;
+  std::shared_ptr<FileIO> io_;
+  internal::TableScanContext context_;
+  std::shared_ptr<Schema> snapshot_schema_;
 };
 
 /// \brief Represents a configured scan operation on a table.
 class ICEBERG_EXPORT TableScan {
  public:
-  virtual ~TableScan() = default;
+  virtual ~TableScan();
 
-  /// \brief Constructs a TableScan with the given context and file I/O.
-  /// \param context Scan context including snapshot, schema, and filter.
-  /// \param file_io File I/O instance for reading manifests and data files.
-  TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io);
+  /// \brief Returns the table metadata being scanned.
+  const std::shared_ptr<TableMetadata>& metadata() const;
 
-  /// \brief Returns the snapshot being scanned.
-  /// \return A shared pointer to the snapshot.
-  const std::shared_ptr<Snapshot>& snapshot() const;
+  /// \brief Returns the snapshot to scan.
+  Result<std::shared_ptr<Snapshot>> snapshot() const;
 
   /// \brief Returns the projected schema for the scan.
-  /// \return A shared pointer to the projected schema.
-  const std::shared_ptr<Schema>& projection() const;
+  Result<std::shared_ptr<Schema>> schema() const;
 
   /// \brief Returns the scan context.
-  /// \return A reference to the TableScanContext.
-  const TableScanContext& context() const;
+  const internal::TableScanContext& context() const;
 
-  /// \brief Returns the file I/O instance used for reading manifests and data 
files.
-  /// \return A shared pointer to the FileIO instance.
+  /// \brief Returns the file I/O instance used for reading files.
   const std::shared_ptr<FileIO>& io() const;
 
+  /// \brief Returns this scan's filter expression.
+  const std::shared_ptr<Expression>& filter() const;
+
+  /// \brief Returns whether this scan is case-sensitive.
+  bool is_case_sensitive() const;
+
   /// \brief Plans the scan tasks by resolving manifests and data files.
   /// \return A Result containing scan tasks or an error.
   virtual Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const 
= 0;
 
  protected:
-  /// \brief context for the scan, including snapshot, schema, and filter.
-  const TableScanContext context_;
-  /// \brief File I/O instance for reading manifests and data files.
-  std::shared_ptr<FileIO> file_io_;
+  TableScan(std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> 
schema,
+            std::shared_ptr<FileIO> io, internal::TableScanContext context);
+
+  Result<std::reference_wrapper<const std::shared_ptr<Schema>>> 
ResolveProjectedSchema()
+      const;
+
+  virtual const std::vector<std::string>& ScanColumns() const;
+
+  const std::shared_ptr<TableMetadata> metadata_;
+  const std::shared_ptr<Schema> schema_;
+  const std::shared_ptr<FileIO> io_;
+  const internal::TableScanContext context_;
+  mutable std::shared_ptr<Schema> projected_schema_;
 };
 
 /// \brief A scan that reads data files and applies delete files to filter 
rows.
 class ICEBERG_EXPORT DataTableScan : public TableScan {
  public:
-  /// \brief Constructs a DataScan with the given context and file I/O.
-  DataTableScan(TableScanContext context, std::shared_ptr<FileIO> file_io);
+  /// \brief Constructs a DataTableScan instance.
+  static Result<std::unique_ptr<DataTableScan>> Make(
+      std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+      std::shared_ptr<FileIO> io, internal::TableScanContext context);
 
   /// \brief Plans the scan tasks by resolving manifests and data files.
   /// \return A Result containing scan tasks or an error.
   Result<std::vector<std::shared_ptr<FileScanTask>>> PlanFiles() const 
override;
+
+ protected:
+  DataTableScan(std::shared_ptr<TableMetadata> metadata, 
std::shared_ptr<Schema> schema,
+                std::shared_ptr<FileIO> io, internal::TableScanContext 
context);
 };
 
 }  // namespace iceberg
diff --git a/src/iceberg/test/file_scan_task_test.cc 
b/src/iceberg/test/file_scan_task_test.cc
index b7252850..ba0c41b3 100644
--- a/src/iceberg/test/file_scan_task_test.cc
+++ b/src/iceberg/test/file_scan_task_test.cc
@@ -137,7 +137,7 @@ TEST_F(FileScanTaskTest, ReadFullSchema) {
 
   FileScanTask task(data_file);
 
-  auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
+  auto stream_result = task.ToArrow(file_io_, projected_schema);
   ASSERT_THAT(stream_result, IsOk());
   auto stream = std::move(stream_result.value());
 
@@ -156,7 +156,7 @@ TEST_F(FileScanTaskTest, ReadProjectedAndReorderedSchema) {
 
   FileScanTask task(data_file);
 
-  auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
+  auto stream_result = task.ToArrow(file_io_, projected_schema);
   ASSERT_THAT(stream_result, IsOk());
   auto stream = std::move(stream_result.value());
 
@@ -175,7 +175,7 @@ TEST_F(FileScanTaskTest, ReadEmptyFile) {
 
   FileScanTask task(data_file);
 
-  auto stream_result = task.ToArrow(file_io_, projected_schema, nullptr);
+  auto stream_result = task.ToArrow(file_io_, projected_schema);
   ASSERT_THAT(stream_result, IsOk());
   auto stream = std::move(stream_result.value());
 
diff --git a/src/iceberg/util/snapshot_util.cc 
b/src/iceberg/util/snapshot_util.cc
index e76426f3..2ec36478 100644
--- a/src/iceberg/util/snapshot_util.cc
+++ b/src/iceberg/util/snapshot_util.cc
@@ -228,9 +228,30 @@ Result<std::shared_ptr<Snapshot>> 
SnapshotUtil::SnapshotAfter(const Table& table
       snapshot_id);
 }
 
+namespace {
+
+std::optional<int64_t> OptionalSnapshotIdAsOfTimeImpl(const TableMetadata& 
metadata,
+                                                      TimePointMs 
timestamp_ms) {
+  std::optional<int64_t> snapshot_id = std::nullopt;
+  for (const auto& log_entry : metadata.snapshot_log) {
+    if (log_entry.timestamp_ms <= timestamp_ms) {
+      snapshot_id = log_entry.snapshot_id;
+    }
+  }
+  return snapshot_id;
+}
+
+}  // namespace
+
 Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const Table& table,
                                                  TimePointMs timestamp_ms) {
-  auto snapshot_id = OptionalSnapshotIdAsOfTime(table, timestamp_ms);
+  ICEBERG_PRECHECK(table.metadata() != nullptr, "Table metadata is null");
+  return SnapshotIdAsOfTime(*table.metadata(), timestamp_ms);
+}
+
+Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const TableMetadata& metadata,
+                                                 TimePointMs timestamp_ms) {
+  auto snapshot_id = OptionalSnapshotIdAsOfTimeImpl(metadata, timestamp_ms);
   ICEBERG_CHECK(snapshot_id.has_value(), "Cannot find a snapshot older than 
{}",
                 FormatTimePointMs(timestamp_ms));
   return snapshot_id.value();
@@ -238,13 +259,7 @@ Result<int64_t> SnapshotUtil::SnapshotIdAsOfTime(const 
Table& table,
 
 std::optional<int64_t> SnapshotUtil::OptionalSnapshotIdAsOfTime(
     const Table& table, TimePointMs timestamp_ms) {
-  std::optional<int64_t> snapshot_id = std::nullopt;
-  for (const auto& log_entry : table.history()) {
-    if (log_entry.timestamp_ms <= timestamp_ms) {
-      snapshot_id = log_entry.snapshot_id;
-    }
-  }
-  return snapshot_id;
+  return OptionalSnapshotIdAsOfTimeImpl(*table.metadata(), timestamp_ms);
 }
 
 Result<std::shared_ptr<Schema>> SnapshotUtil::SchemaFor(const Table& table,
diff --git a/src/iceberg/util/snapshot_util_internal.h 
b/src/iceberg/util/snapshot_util_internal.h
index e0d8830f..2b11168e 100644
--- a/src/iceberg/util/snapshot_util_internal.h
+++ b/src/iceberg/util/snapshot_util_internal.h
@@ -170,6 +170,14 @@ class ICEBERG_EXPORT SnapshotUtil {
   /// \return The snapshot ID
   static Result<int64_t> SnapshotIdAsOfTime(const Table& table, TimePointMs 
timestamp_ms);
 
+  /// \brief Returns the ID of the most recent snapshot for the table as of 
the timestamp.
+  ///
+  /// \param metadata The table metadata
+  /// \param timestamp_ms The timestamp in millis since the Unix epoch
+  /// \return The snapshot ID
+  static Result<int64_t> SnapshotIdAsOfTime(const TableMetadata& metadata,
+                                            TimePointMs timestamp_ms);
+
   /// \brief Returns the ID of the most recent snapshot for the table as of 
the timestamp,
   /// or nullopt if not found.
   ///


Reply via email to