HuaHuaY commented on code in PR #489:
URL: https://github.com/apache/iceberg-cpp/pull/489#discussion_r2664231986
##########
src/iceberg/table_scan.cc:
##########
@@ -135,186 +153,327 @@ Result<ArrowArrayStream>
MakeArrowArrayStream(std::unique_ptr<Reader> reader) {
} // namespace
+namespace internal {
+
+Status TableScanContext::Validate() const {
+ ICEBERG_CHECK(columns_to_keep_stats.empty() || return_column_stats,
+ "Cannot select columns to keep stats when column stats are not
returned");
+ ICEBERG_CHECK(projected_schema == nullptr || selected_columns.empty(),
+ "Cannot set projection schema and selected columns at the same
time");
+ ICEBERG_CHECK(!snapshot_id.has_value() ||
+ (!from_snapshot_id.has_value() &&
!to_snapshot_id.has_value()),
+ "Cannot mix snapshot scan and incremental scan");
+ ICEBERG_CHECK(!min_rows_requested.has_value() || min_rows_requested.value()
>= 0,
+ "Min rows requested cannot be negative");
+ return {};
+}
+
+} // namespace internal
+
+ScanTask::~ScanTask() = default;
+
// FileScanTask implementation
FileScanTask::FileScanTask(std::shared_ptr<DataFile> data_file,
std::vector<std::shared_ptr<DataFile>> delete_files,
std::shared_ptr<Expression> residual_filter)
: data_file_(std::move(data_file)),
delete_files_(std::move(delete_files)),
- residual_filter_(std::move(residual_filter)) {}
-
-const std::shared_ptr<DataFile>& FileScanTask::data_file() const { return
data_file_; }
-
-const std::vector<std::shared_ptr<DataFile>>& FileScanTask::delete_files()
const {
- return delete_files_;
-}
-
-const std::shared_ptr<Expression>& FileScanTask::residual_filter() const {
- return residual_filter_;
+ residual_filter_(std::move(residual_filter)) {
+ ICEBERG_DCHECK(data_file_ != nullptr, "Data file cannot be null for
FileScanTask");
}
-bool FileScanTask::has_deletes() const { return !delete_files_.empty(); }
-
-bool FileScanTask::has_residual_filter() const { return residual_filter_ !=
nullptr; }
-
int64_t FileScanTask::size_bytes() const { return
data_file_->file_size_in_bytes; }
int32_t FileScanTask::files_count() const { return 1; }
int64_t FileScanTask::estimated_row_count() const { return
data_file_->record_count; }
Result<ArrowArrayStream> FileScanTask::ToArrow(
- const std::shared_ptr<FileIO>& io, const std::shared_ptr<Schema>&
projected_schema,
- const std::shared_ptr<Expression>& filter) const {
- if (has_deletes()) {
+ const std::shared_ptr<FileIO>& io, std::shared_ptr<Schema>
projected_schema) const {
+ if (!delete_files_.empty()) {
return NotSupported("Reading data files with delete files is not yet
supported.");
}
const ReaderOptions options{.path = data_file_->file_path,
.length = data_file_->file_size_in_bytes,
.io = io,
- .projection = projected_schema,
- .filter = filter};
+ .projection = std::move(projected_schema),
+ .filter = residual_filter_};
ICEBERG_ASSIGN_OR_RAISE(auto reader,
ReaderFactoryRegistry::Open(data_file_->file_format,
options));
return MakeArrowArrayStream(std::move(reader));
}
+Result<std::unique_ptr<TableScanBuilder>> TableScanBuilder::Make(
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<FileIO> io) {
+ ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return std::unique_ptr<TableScanBuilder>(
+ new TableScanBuilder(std::move(metadata), std::move(io)));
+}
+
TableScanBuilder::TableScanBuilder(std::shared_ptr<TableMetadata>
table_metadata,
std::shared_ptr<FileIO> file_io)
- : file_io_(std::move(file_io)) {
- context_.table_metadata = std::move(table_metadata);
-}
+ : metadata_(std::move(table_metadata)), io_(std::move(file_io)) {}
-TableScanBuilder& TableScanBuilder::WithColumnNames(
- std::vector<std::string> column_names) {
- column_names_ = std::move(column_names);
+TableScanBuilder& TableScanBuilder::Option(std::string key, std::string value)
{
+ context_.options[std::move(key)] = std::move(value);
return *this;
}
-TableScanBuilder&
TableScanBuilder::WithProjectedSchema(std::shared_ptr<Schema> schema) {
+TableScanBuilder& TableScanBuilder::Project(std::shared_ptr<Schema> schema) {
context_.projected_schema = std::move(schema);
return *this;
}
-TableScanBuilder& TableScanBuilder::WithSnapshotId(int64_t snapshot_id) {
- snapshot_id_ = snapshot_id;
+TableScanBuilder& TableScanBuilder::CaseSensitive(bool case_sensitive) {
+ context_.case_sensitive = case_sensitive;
return *this;
}
-TableScanBuilder& TableScanBuilder::WithFilter(std::shared_ptr<Expression>
filter) {
+TableScanBuilder& TableScanBuilder::IncludeColumnStats() {
+ context_.return_column_stats = true;
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::IncludeColumnStats(
+ const std::vector<std::string>& requested_columns) {
+ context_.return_column_stats = true;
+ context_.columns_to_keep_stats.clear();
+ context_.columns_to_keep_stats.reserve(requested_columns.size());
+
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto schema_ref, ResolveSnapshotSchema());
+ const auto& schema = schema_ref.get();
+ for (const auto& column_name : requested_columns) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto field,
schema->FindFieldByName(column_name));
+ if (field.has_value()) {
+ context_.columns_to_keep_stats.insert(field.value().get().field_id());
+ }
+ }
+
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::Select(const std::vector<std::string>&
column_names) {
+ context_.selected_columns = column_names;
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::Filter(std::shared_ptr<Expression> filter)
{
context_.filter = std::move(filter);
return *this;
}
-TableScanBuilder& TableScanBuilder::WithCaseSensitive(bool case_sensitive) {
- context_.case_sensitive = case_sensitive;
+TableScanBuilder& TableScanBuilder::IgnoreResiduals() {
+ context_.ignore_residuals = true;
return *this;
}
-TableScanBuilder& TableScanBuilder::WithOption(std::string property,
std::string value) {
- context_.options[std::move(property)] = std::move(value);
+TableScanBuilder& TableScanBuilder::MinRowsRequested(int64_t num_rows) {
+ context_.min_rows_requested = num_rows;
return *this;
}
-TableScanBuilder& TableScanBuilder::WithLimit(std::optional<int64_t> limit) {
- context_.limit = limit;
+TableScanBuilder& TableScanBuilder::UseSnapshot(int64_t snapshot_id) {
+ ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
+ "Cannot override snapshot, already set snapshot id={}",
+ context_.snapshot_id.value());
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(snapshot_id));
+ context_.snapshot_id = snapshot_id;
return *this;
}
-Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
- const auto& table_metadata = context_.table_metadata;
- auto snapshot_id = snapshot_id_ ? snapshot_id_ :
table_metadata->current_snapshot_id;
- if (!snapshot_id) {
- return InvalidArgument("No snapshot ID specified for table {}",
- table_metadata->table_uuid);
+TableScanBuilder& TableScanBuilder::UseRef(const std::string& ref) {
+ if (ref == SnapshotRef::kMainBranch) {
+ snapshot_schema_ = nullptr;
+ context_.snapshot_id.reset();
+ return *this;
}
- ICEBERG_ASSIGN_OR_RAISE(context_.snapshot,
table_metadata->SnapshotById(*snapshot_id));
- if (!context_.projected_schema) {
- const auto& snapshot = context_.snapshot;
- auto schema_id = table_metadata->current_schema_id;
- ICEBERG_ASSIGN_OR_RAISE(auto schema,
table_metadata->SchemaById(schema_id));
+ ICEBERG_BUILDER_CHECK(!context_.snapshot_id.has_value(),
+ "Cannot override ref, already set snapshot id={}",
+ context_.snapshot_id.value());
+ auto iter = metadata_->refs.find(ref);
+ ICEBERG_BUILDER_CHECK(iter != metadata_->refs.end(), "Cannot find ref {}",
ref);
+ ICEBERG_BUILDER_CHECK(iter->second != nullptr, "Ref {} is null", ref);
+ int32_t snapshot_id = iter->second->snapshot_id;
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(std::ignore,
metadata_->SnapshotById(snapshot_id));
+ context_.snapshot_id = snapshot_id;
+
+ return *this;
+}
+
+TableScanBuilder& TableScanBuilder::AsOfTime(int64_t timestamp_millis) {
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto time_point_ms,
+ TimePointMsFromUnixMs(timestamp_millis));
+ ICEBERG_BUILDER_ASSIGN_OR_RETURN(
+ auto snapshot_id, SnapshotUtil::SnapshotIdAsOfTime(*metadata_,
time_point_ms));
+ return UseSnapshot(snapshot_id);
+}
+
+TableScanBuilder& TableScanBuilder::FromSnapshot(
+ [[maybe_unused]] int64_t from_snapshot_id, [[maybe_unused]] bool
inclusive) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
- if (column_names_.empty()) {
- context_.projected_schema = schema;
+TableScanBuilder& TableScanBuilder::FromSnapshot([[maybe_unused]] const
std::string& ref,
+ [[maybe_unused]] bool
inclusive) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] int64_t
to_snapshot_id) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::ToSnapshot([[maybe_unused]] const
std::string& ref) {
+ return AddError(NotImplemented("Incremental scan is not implemented"));
+}
+
+TableScanBuilder& TableScanBuilder::UseBranch(const std::string& branch) {
+ context_.branch = branch;
+ return *this;
+}
+
+Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
+TableScanBuilder::ResolveSnapshotSchema() {
+ if (snapshot_schema_ == nullptr) {
+ if (context_.snapshot_id.has_value()) {
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot,
+ metadata_->SnapshotById(*context_.snapshot_id));
+ int32_t schema_id =
snapshot->schema_id.value_or(Schema::kInitialSchemaId);
+ ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_,
metadata_->SchemaById(schema_id));
} else {
- // TODO(gty404): collect touched columns from filter expression
- std::vector<SchemaField> projected_fields;
- projected_fields.reserve(column_names_.size());
- for (const auto& column_name : column_names_) {
- // TODO(gty404): support case-insensitive column names
- auto field_opt = schema->GetFieldByName(column_name);
- if (!field_opt) {
- return InvalidArgument("Column {} not found in schema '{}'",
column_name,
- schema_id);
- }
- projected_fields.emplace_back(field_opt.value()->get());
- }
- context_.projected_schema =
- std::make_shared<Schema>(std::move(projected_fields),
schema->schema_id());
+ ICEBERG_ASSIGN_OR_RAISE(snapshot_schema_, metadata_->Schema());
}
- } else if (!column_names_.empty()) {
- return InvalidArgument(
- "Cannot specify column names when a projected schema is provided");
}
+ ICEBERG_CHECK(snapshot_schema_ != nullptr, "Snapshot schema is null");
+ return snapshot_schema_;
+}
+
+bool TableScanBuilder::IsIncrementalScan() const {
+ return context_.from_snapshot_id.has_value() ||
context_.to_snapshot_id.has_value();
+}
+
+Result<std::unique_ptr<TableScan>> TableScanBuilder::Build() {
+ ICEBERG_RETURN_UNEXPECTED(CheckErrors());
+ ICEBERG_RETURN_UNEXPECTED(context_.Validate());
+
+ if (IsIncrementalScan()) {
+ return NotImplemented("Incremental scan is not yet implemented");
+ }
+
+ ICEBERG_ASSIGN_OR_RAISE(auto schema, ResolveSnapshotSchema());
+ return DataTableScan::Make(metadata_, schema.get(), io_,
std::move(context_));
+}
+
+TableScan::TableScan(std::shared_ptr<TableMetadata> metadata,
+ std::shared_ptr<Schema> schema, std::shared_ptr<FileIO>
file_io,
+ internal::TableScanContext context)
+ : metadata_(std::move(metadata)),
+ schema_(std::move(schema)),
+ io_(std::move(file_io)),
+ context_(std::move(context)) {}
+
+TableScan::~TableScan() = default;
- return std::make_unique<DataTableScan>(std::move(context_), file_io_);
+const std::shared_ptr<TableMetadata>& TableScan::metadata() const { return
metadata_; }
+
+Result<std::shared_ptr<Snapshot>> TableScan::snapshot() const {
+ auto snapshot_id = context_.snapshot_id ? context_.snapshot_id.value()
+ : metadata_->current_snapshot_id;
+ return metadata_->SnapshotById(snapshot_id);
+}
+
+Result<std::shared_ptr<Schema>> TableScan::schema() const {
+ return ResolveProjectedSchema();
+}
+
+const internal::TableScanContext& TableScan::context() const { return
context_; }
+
+const std::shared_ptr<FileIO>& TableScan::io() const { return io_; }
+
+const std::shared_ptr<Expression>& TableScan::filter() const {
+ const static std::shared_ptr<Expression> true_expr = True::Instance();
+ if (!context_.filter) {
+ return true_expr;
+ }
+ return context_.filter;
}
-TableScan::TableScan(TableScanContext context, std::shared_ptr<FileIO> file_io)
- : context_(std::move(context)), file_io_(std::move(file_io)) {}
+bool TableScan::is_case_sensitive() const { return context_.case_sensitive; }
+
+Result<std::reference_wrapper<const std::shared_ptr<Schema>>>
+TableScan::ResolveProjectedSchema() const {
+ if (projected_schema_ != nullptr) {
+ return projected_schema_;
+ }
-const std::shared_ptr<Snapshot>& TableScan::snapshot() const { return
context_.snapshot; }
+ if (!context_.selected_columns.empty()) {
+ /// TODO(gangwu): port Java BaseScan.lazyColumnProjection to collect field
ids
+ /// from selected column names and bound references in the filter, and
then create
+ /// projected schema based on the collected field ids.
+ return NotImplemented(
+ "Selecting columns by name to create projected schema is not yet
implemented");
+ } else if (context_.projected_schema != nullptr) {
+ projected_schema_ = context_.projected_schema;
+ } else {
+ projected_schema_ = schema_;
+ }
-const std::shared_ptr<Schema>& TableScan::projection() const {
- return context_.projected_schema;
+ return projected_schema_;
}
-const TableScanContext& TableScan::context() const { return context_; }
+const std::vector<std::string>& TableScan::ScanColumns() const {
+ return context_.return_column_stats ? kScanColumnsWithStats : kScanColumns;
+}
-const std::shared_ptr<FileIO>& TableScan::io() const { return file_io_; }
+Result<std::unique_ptr<DataTableScan>> DataTableScan::Make(
+ std::shared_ptr<TableMetadata> metadata, std::shared_ptr<Schema> schema,
+ std::shared_ptr<FileIO> io, internal::TableScanContext context) {
+ ICEBERG_PRECHECK(metadata != nullptr, "Table metadata cannot be null");
+ ICEBERG_PRECHECK(schema != nullptr, "Schema cannot be null");
+ ICEBERG_PRECHECK(io != nullptr, "FileIO cannot be null");
+ return std::unique_ptr<DataTableScan>(new DataTableScan(
+ std::move(metadata), std::move(schema), std::move(io),
std::move(context)));
+}
-DataTableScan::DataTableScan(TableScanContext context, std::shared_ptr<FileIO>
file_io)
- : TableScan(std::move(context), std::move(file_io)) {}
+DataTableScan::DataTableScan(std::shared_ptr<TableMetadata> metadata,
+ std::shared_ptr<Schema> schema,
std::shared_ptr<FileIO> io,
+ internal::TableScanContext context)
+ : TableScan(std::move(metadata), std::move(schema), std::move(io),
+ std::move(context)) {}
Result<std::vector<std::shared_ptr<FileScanTask>>> DataTableScan::PlanFiles()
const {
- ICEBERG_ASSIGN_OR_RAISE(
- auto manifest_list_reader,
- ManifestListReader::Make(context_.snapshot->manifest_list, file_io_));
- ICEBERG_ASSIGN_OR_RAISE(auto manifest_files, manifest_list_reader->Files());
-
- std::vector<std::shared_ptr<FileScanTask>> tasks;
- ICEBERG_ASSIGN_OR_RAISE(auto partition_spec,
context_.table_metadata->PartitionSpec());
-
- // Get the table schema
- ICEBERG_ASSIGN_OR_RAISE(auto current_schema,
context_.table_metadata->Schema());
-
- for (const auto& manifest_file : manifest_files) {
- ICEBERG_ASSIGN_OR_RAISE(
- auto manifest_reader,
- ManifestReader::Make(manifest_file, file_io_, current_schema,
partition_spec));
- ICEBERG_ASSIGN_OR_RAISE(auto manifests, manifest_reader->Entries());
-
- // TODO(gty404): filter manifests using partition spec and filter
expression
-
- for (auto& manifest_entry : manifests) {
- const auto& data_file = manifest_entry.data_file;
- switch (data_file->content) {
- case DataFile::Content::kData:
-
tasks.emplace_back(std::make_shared<FileScanTask>(manifest_entry.data_file));
- break;
- case DataFile::Content::kPositionDeletes:
- case DataFile::Content::kEqualityDeletes:
- return NotSupported("Equality/Position deletes are not supported in
data scan");
- }
- }
+ ICEBERG_ASSIGN_OR_RAISE(auto snapshot, this->snapshot());
+ if (!snapshot) {
+ return std::vector<std::shared_ptr<FileScanTask>>{};
}
- return tasks;
+ TableMetadataCache metadata_cache(metadata_.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto specs_by_id,
metadata_cache.GetPartitionSpecsById());
+
+ SnapshotCache snapshot_cache(snapshot.get());
+ ICEBERG_ASSIGN_OR_RAISE(auto data_manifests,
snapshot_cache.DataManifests(io_));
+ ICEBERG_ASSIGN_OR_RAISE(auto delete_manifests,
snapshot_cache.DeleteManifests(io_));
+
+ ICEBERG_ASSIGN_OR_RAISE(
+ auto manifest_group,
+ ManifestGroup::Make(io_, schema_, specs_by_id,
+ {data_manifests.begin(), data_manifests.end()},
+ {delete_manifests.begin(), delete_manifests.end()}));
+ (*manifest_group)
+ .CaseSensitive(context_.case_sensitive)
Review Comment:
You can use `->` for first call and `.` for others.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]