This is an automated email from the ASF dual-hosted git repository. suxiaogang223 pushed a commit to branch refact_reader_branch in repository https://gitbox.apache.org/repos/asf/doris.git
commit ef45c206aa3b6dc6fe5ac62b5d1694f90d3cc386 Author: Socrates <[email protected]> AuthorDate: Mon May 18 15:52:10 2026 +0800 Add Iceberg Parquet reader API skeleton --- be/src/format/parquet/parquet_reader.h | 67 +++++ be/src/format/reader/file_reader.h | 141 +++++++++ be/src/format/reader/table_reader.h | 256 ++++++++++++++++ be/src/format/table/iceberg_reader_v2.h | 186 ++++++++++++ docs/doris-iceberg-parquet-api-design.md | 483 +++++++++++++++++++++++++++++++ 5 files changed, 1133 insertions(+) diff --git a/be/src/format/parquet/parquet_reader.h b/be/src/format/parquet/parquet_reader.h new file mode 100644 index 00000000000..dfac6494cd8 --- /dev/null +++ b/be/src/format/parquet/parquet_reader.h @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <vector> + +#include "common/status.h" +#include "format/reader/file_reader.h" + +namespace doris { +namespace io { +struct IOContext; +} // namespace io +} // namespace doris + +namespace doris::parquet { + +// ParquetReader 的 file-local scan 请求。 +// 当前没有新增 Parquet-only 字段,但保留独立类型,便于后续加入 row group/page index +// 等 Parquet 专属选项。 +struct ParquetScanRequest : public reader::FileScanRequest {}; + +// Parquet 文件物理读取层。 +// 该类只理解 Parquet file-local schema 和 ParquetScanRequest,不理解 Iceberg/global +// schema,不处理 table-level cast/default/generated/partition 语义。 +class ParquetReader : public reader::FileReader { +public: + virtual ~ParquetReader() = default; + + Status get_schema(std::vector<reader::SchemaField>* file_schema) const override { + // 真实实现会从 Parquet footer / schema descriptor 展开 file-local schema。 + file_schema->clear(); + return Status::OK(); + } + + Status init(const ParquetScanRequest& request) { + // 真实实现会根据 projected_file_columns、local_filters 和 reader_expression_map + // 初始化 row group、column chunk、page reader 以及延时物化计划。 + return reader::FileReader::init(request); + } + + Status next(Block* file_block, size_t* rows, bool* eof) override { + // 真实实现会输出 file-local block。stub 默认立即 EOF。 + return reader::FileReader::next(file_block, rows, eof); + } + + Status init(const reader::FileScanRequest& request) override { + return reader::FileReader::init(request); + } +}; + +} // namespace doris::parquet diff --git a/be/src/format/reader/file_reader.h b/be/src/format/reader/file_reader.h new file mode 100644 index 00000000000..af03691b94e --- /dev/null +++ b/be/src/format/reader/file_reader.h @@ -0,0 +1,141 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "common/status.h" +#include "core/data_type/data_type.h" +#include "exprs/vexpr_fwd.h" +#include "io/fs/file_reader_writer_fwd.h" + +namespace doris { +class Block; +class ColumnPredicate; + +namespace io { +struct IOContext; +} // namespace io +} // namespace doris + +namespace doris::reader { + +using ColumnId = int32_t; + +// 文件本地 schema 字段。 +// 这是 FileReader 暴露给 table 层的 file-local schema 视图,不携带 table/global +// schema 语义。Iceberg field id、name mapping、default/generated/partition 列都不在 +// FileReader 内部解释。 +struct SchemaField { + ColumnId id = -1; + std::string name; + DataTypePtr type; + std::vector<SchemaField> children; +}; + +// 已经 localize 到文件 schema 的过滤条件。 +// TableColumnMapper 负责把 table-level filter 转成这个结构;FileReader 只消费 +// file-local column id、表达式和结构化谓词。 +struct FileLocalFilter { + ColumnId file_column_id = -1; + + // 表达式过滤。适合 cast、复杂表达式或 reader_expression_map 生成的临时列过滤。 + // 它通常不能直接驱动 row group stats、page index、dictionary、bloom filter。 + VExprContextSPtr conjunct; + + // 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、 + // bloom filter 等只理解单列谓词的优化。 + std::vector<std::shared_ptr<ColumnPredicate>> predicates; +}; + +// 通用文件层 scan 请求。 +// 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global +// schema。所有 schema change、filter localization、default/generated/partition +// 列都应在 table 层完成。 +struct FileScanRequest { + virtual ~FileScanRequest() = default; + + std::vector<ColumnId> projected_file_columns; + std::vector<FileLocalFilter> local_filters; + std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map; +}; + +// 文件物理读取层通用接口。 +// 该接口只描述 file-local schema、file-local scan request 和 file-local block。 +// TableReader/IcebergTableReader 可以通过它组合不同文件格式 reader。 +class FileReader { +public: + virtual ~FileReader() = default; + + virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = nullptr) { + // 真实实现会保存文件句柄、IO 上下文并读取文件元数据。 + _file = std::move(file); + _io_ctx = io_ctx; + _eof = false; + return Status::OK(); + } + + virtual Status get_schema(std::vector<SchemaField>* file_schema) const { + // 真实实现会展开文件格式自己的 file-local schema。 + file_schema->clear(); + return Status::OK(); + } + + virtual Status init(const FileScanRequest& request) { + // 真实实现会根据 projected columns、local filters 和 reader expressions + // 初始化文件格式自己的物理读取计划。 + _request.projected_file_columns = request.projected_file_columns; + _request.local_filters = request.local_filters; + _request.reader_expression_map = request.reader_expression_map; + return Status::OK(); + } + + virtual Status next(Block* file_block, size_t* rows, bool* eof) { + // stub 默认立即 EOF。 + (void)file_block; + if (rows != nullptr) { + *rows = 0; + } + if (eof != nullptr) { + *eof = true; + } + _eof = true; + return Status::OK(); + } + + virtual Status close() { + _file.reset(); + _io_ctx = nullptr; + _request = FileScanRequest {}; + _eof = true; + return Status::OK(); + } + +protected: + io::FileReaderSPtr _file; + io::IOContext* _io_ctx = nullptr; + FileScanRequest _request; + bool _eof = true; +}; + +} // namespace doris::reader diff --git a/be/src/format/reader/table_reader.h b/be/src/format/reader/table_reader.h new file mode 100644 index 00000000000..422ee3142d1 --- /dev/null +++ b/be/src/format/reader/table_reader.h @@ -0,0 +1,256 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <optional> +#include <string> +#include <utility> +#include <vector> + +#include "common/status.h" +#include "core/data_type/data_type.h" +#include "exprs/vexpr_fwd.h" +#include "format/reader/file_reader.h" + +namespace doris { +class Block; +class ColumnPredicate; +} // namespace doris + +namespace doris::reader { + +// table/global schema 中的列视图。 +// Iceberg 场景下,id 默认对应 Iceberg field id。该结构不描述文件中的物理列。 +struct TableColumn { + ColumnId id = -1; + std::string name; + DataTypePtr type; + std::vector<TableColumn> children; +}; + +// table-level filter。 +// TableColumnMapper 负责把它转换成 FileLocalFilter 或 reader_expression_map。 +struct TableFilter { + ColumnId table_column_id = -1; + + // 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。 + VExprContextSPtr conjunct; + + // 结构化列谓词,适合下推到文件层做 row group stats、page index、dictionary、 + // bloom filter 等优化。 + std::vector<std::shared_ptr<ColumnPredicate>> predicates; +}; + +// 单个 table column 到 file column 的映射结果。 +// 这是 table 层和 file 层的核心边界对象。 +struct ColumnMapping { + ColumnId table_column_id = -1; + std::optional<ColumnId> file_column_id; + DataTypePtr file_type; + DataTypePtr table_type; + + // 最终输出表达式。用于把 file-local value 转成 table/global value,例如 cast、 + // default、partition、generated column 或复杂列 remap。 + VExprContextSPtr finalize_expr; + + // 读时过滤 fallback 表达式。只在 table filter 不能安全转换成 file-local predicate + // 时使用,服务 reader_expression_map,不等价于 finalize_expr。 + VExprContextSPtr reader_filter_expr; + + std::vector<ColumnMapping> child_mappings; + bool is_trivial = false; + bool is_constant = false; +}; + +enum class TableColumnMappingMode { + BY_FIELD_ID, + BY_NAME, +}; + +enum class TableFilterConversion { + COPY_DIRECTLY, + CAST_FILTER, + EVALUATE_EXPRESSION, + FINALIZE_ONLY, +}; + +struct TableColumnMapperOptions { + TableColumnMappingMode mode = TableColumnMappingMode::BY_FIELD_ID; + bool allow_missing_columns = true; + bool enable_reader_expression_fallback = true; +}; + +// table-level scan 请求。 +// 它仍然使用 table/global schema 语义,不能直接传给 FileReader。 +struct TableScanRequest { + std::vector<TableColumn> projected_table_columns; + std::vector<TableFilter> table_filters; +}; + +// 通用 table schema 到 file schema 映射层。 +// Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成 +// Iceberg-only 组件。 +class TableColumnMapper { +public: + explicit TableColumnMapper(TableColumnMapperOptions options = {}) : _options(std::move(options)) {} + virtual ~TableColumnMapper() = default; + + virtual Status create_mapping(const std::vector<TableColumn>& table_schema, + const std::vector<SchemaField>& file_schema, + std::vector<ColumnMapping>* mappings) { + // 真实实现会做 field id/name matching、类型转换、复杂列 child mapping、缺失列 + // default/partition/generated 表达式构造。 + mappings->clear(); + for (const auto& table_column : table_schema) { + ColumnMapping mapping; + mapping.table_column_id = table_column.id; + mapping.table_type = table_column.type; + if (const auto* file_field = find_file_field(table_column, file_schema)) { + mapping.file_column_id = file_field->id; + mapping.file_type = file_field->type; + mapping.is_trivial = is_same_type(mapping.table_type, mapping.file_type); + } else { + mapping.is_constant = true; + } + mappings->push_back(std::move(mapping)); + } + _mappings = *mappings; + return Status::OK(); + } + + virtual Status create_scan_request(const TableScanRequest& table_request, + const std::vector<ColumnMapping>& mappings, + FileScanRequest* file_request) { + // 真实实现会把 table projection/filter 转换成 file-local projection/filter。 + file_request->projected_file_columns.clear(); + file_request->local_filters.clear(); + file_request->reader_expression_map.clear(); + _mappings = mappings; + for (const auto& table_column : table_request.projected_table_columns) { + const auto* mapping = find_mapping(table_column.id); + if (mapping != nullptr && mapping->file_column_id.has_value()) { + file_request->projected_file_columns.push_back(*mapping->file_column_id); + } + } + RETURN_IF_ERROR(localize_filters(table_request.table_filters, file_request)); + return Status::OK(); + } + + virtual Status localize_filters(const std::vector<TableFilter>& table_filters, + FileScanRequest* file_request) const { + // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和 + // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。 + for (const auto& filter : table_filters) { + const auto* mapping = find_mapping(filter.table_column_id); + if (mapping == nullptr || !mapping->file_column_id.has_value()) { + continue; + } + FileLocalFilter local_filter; + local_filter.file_column_id = *mapping->file_column_id; + local_filter.conjunct = filter.conjunct; + local_filter.predicates = filter.predicates; + file_request->local_filters.push_back(std::move(local_filter)); + } + return Status::OK(); + } + + const std::vector<ColumnMapping>& mappings() const { return _mappings; } + +private: + const SchemaField* find_file_field( + const TableColumn& table_column, + const std::vector<SchemaField>& file_schema) const { + for (const auto& field : file_schema) { + if (_options.mode == TableColumnMappingMode::BY_FIELD_ID && field.id == table_column.id) { + return &field; + } + if (_options.mode == TableColumnMappingMode::BY_NAME && field.name == table_column.name) { + return &field; + } + } + return nullptr; + } + + const ColumnMapping* find_mapping(ColumnId table_column_id) const { + for (const auto& mapping : _mappings) { + if (mapping.table_column_id == table_column_id) { + return &mapping; + } + } + return nullptr; + } + + bool is_same_type(const DataTypePtr& table_type, const DataTypePtr& file_type) const { + return table_type == file_type; + } + +private: + TableColumnMapperOptions _options; + std::vector<ColumnMapping> _mappings; +}; + +struct TableReadOptions { + size_t batch_size = 4096; +}; + +// table-level reader 基类。 +// 该层负责多文件编排和动态分区裁剪等通用 table-level 逻辑,对外输出 table block。 +class TableReader { +public: + virtual ~TableReader() = default; + + virtual Status init(const TableReadOptions& options) { + _options = options; + return Status::OK(); + } + + virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all) { + // 真实实现会基于 split/partition/file stats 判断动态分区裁剪结果。 + (void)expr; + if (can_filter_all != nullptr) { + *can_filter_all = false; + } + return Status::OK(); + } + + virtual Status next_reader() { + // 真实实现会切换到下一个 data file / split reader。 + return Status::OK(); + } + + virtual Status next(Block* table_block, size_t* rows, bool* eof) { + (void)table_block; + if (rows != nullptr) { + *rows = 0; + } + if (eof != nullptr) { + *eof = true; + } + return Status::OK(); + } + + virtual Status close() { return Status::OK(); } + +protected: + TableReadOptions _options; +}; + +} // namespace doris::reader diff --git a/be/src/format/table/iceberg_reader_v2.h b/be/src/format/table/iceberg_reader_v2.h new file mode 100644 index 00000000000..70ee2bb3ff5 --- /dev/null +++ b/be/src/format/table/iceberg_reader_v2.h @@ -0,0 +1,186 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <cstddef> +#include <cstdint> +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "common/status.h" +#include "format/reader/file_reader.h" +#include "format/reader/table_reader.h" + +namespace doris { +class Block; +} // namespace doris + +namespace doris::iceberg { + +// Iceberg data file 摘要。它描述当前要读取的物理 data file,不承载列映射逻辑。 +struct IcebergDataFile { + std::string path; + std::string format; + int64_t record_count = 0; + int64_t file_size = 0; + int64_t sequence_number = 0; + int64_t first_row_id = -1; +}; + +// Iceberg delete file 摘要。position/equality/deletion vector 的具体读取在 +// IcebergTableReader 实现阶段补齐。 +struct IcebergDeleteFile { + std::string path; + std::string format; + int64_t sequence_number = 0; + std::vector<reader::ColumnId> equality_field_ids; +}; + +// 单个 Iceberg data file 的 scan 输入。 +// 该结构只进入 IcebergTableReader,不直接传给 ParquetReader。 +struct IcebergScanTask { + IcebergDataFile data_file; + std::vector<IcebergDeleteFile> positional_deletes; + std::vector<IcebergDeleteFile> equality_deletes; + std::vector<IcebergDeleteFile> deletion_vectors; +}; + +struct IcebergReadOptions { + reader::TableReadOptions table_options; + bool enable_position_delete = true; + bool enable_equality_delete = true; + bool enable_deletion_vector = true; +}; + +// Iceberg table-level reader。 +// 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合 +// FileReader 完成 data file 物理读取,不继承具体文件格式 reader。 +class IcebergTableReader : public reader::TableReader { +public: + IcebergTableReader() = default; + + explicit IcebergTableReader(std::unique_ptr<reader::FileReader> data_reader) + : _data_reader(std::move(data_reader)) {} + + ~IcebergTableReader() override = default; + + Status init(const IcebergReadOptions& options, + std::unique_ptr<reader::FileReader> data_reader) { + _iceberg_options = options; + _data_reader = std::move(data_reader); + return reader::TableReader::init(options.table_options); + } + + Status bind(const std::vector<reader::TableColumn>& iceberg_schema) { + // 真实实现会绑定 Iceberg 当前 schema,并准备 field-id based mapping 输入。 + _iceberg_schema = iceberg_schema; + return Status::OK(); + } + + Status init(const reader::TableScanRequest& request) { + // 保存 table-level projection/filter,后续由 TableColumnMapper 转成 FileScanRequest。 + _table_scan_request = request; + return Status::OK(); + } + + Status open_task(const IcebergScanTask& task) { + // 真实实现会读取 data file schema,创建 field-id mapping,应用 position deletes, + // 并初始化底层 ParquetReader。 + _scan_task = task; + std::vector<reader::SchemaField> file_schema; + if (_data_reader) { + RETURN_IF_ERROR(_data_reader->get_schema(&file_schema)); + } + reader::TableColumnMapperOptions mapper_options; + mapper_options.mode = reader::TableColumnMappingMode::BY_FIELD_ID; + _column_mapper = reader::TableColumnMapper(mapper_options); + RETURN_IF_ERROR(_column_mapper.create_mapping(_iceberg_schema, file_schema, &_mappings)); + + reader::FileScanRequest file_request; + RETURN_IF_ERROR(_column_mapper.create_scan_request(_table_scan_request, _mappings, + &file_request)); + RETURN_IF_ERROR(apply_position_deletes(&file_request)); + if (_data_reader) { + RETURN_IF_ERROR(_data_reader->init(file_request)); + } + return Status::OK(); + } + + Status next(Block* table_block, size_t* rows, bool* eof) override { + // 真实实现会读取 file-local block,finalize 成 table block,再应用 equality delete + // 和 Iceberg virtual columns。stub 默认 EOF。 + // 后续实现应在 IcebergTableReader 内部持有 file-local block;这里仅复用输出指针 + // 作为 header-only API 占位,避免在骨架阶段引入 Block 的完整定义。 + Block* file_block = table_block; + if (_data_reader) { + RETURN_IF_ERROR(_data_reader->next(file_block, rows, eof)); + } + RETURN_IF_ERROR(finalize_chunk(file_block, table_block)); + RETURN_IF_ERROR(apply_equality_deletes(table_block)); + RETURN_IF_ERROR(materialize_virtual_columns(table_block, rows != nullptr ? *rows : 0)); + return Status::OK(); + } + + Status finalize_chunk(Block* file_block, Block* table_block) { + // 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated + // expressions,把 file-local block 写成 table block。 + (void)file_block; + (void)table_block; + return Status::OK(); + } + + Status apply_position_deletes(reader::FileScanRequest* request) { + // 真实实现会把 position delete / deletion vector 转换成 file-local delete 信息。 + (void)request; + return Status::OK(); + } + + Status apply_equality_deletes(Block* table_block) { + // 真实实现会在 table block 上应用 equality delete。 + (void)table_block; + return Status::OK(); + } + + Status materialize_virtual_columns(Block* table_block, size_t rows) { + // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。 + (void)table_block; + (void)rows; + return Status::OK(); + } + + Status close() override { + if (_data_reader) { + RETURN_IF_ERROR(_data_reader->close()); + } + _data_reader.reset(); + return Status::OK(); + } + +private: + IcebergReadOptions _iceberg_options; + IcebergScanTask _scan_task; + reader::TableScanRequest _table_scan_request; + std::vector<reader::TableColumn> _iceberg_schema; + std::vector<reader::ColumnMapping> _mappings; + reader::TableColumnMapper _column_mapper; + std::unique_ptr<reader::FileReader> _data_reader; +}; + +} // namespace doris::iceberg diff --git a/docs/doris-iceberg-parquet-api-design.md b/docs/doris-iceberg-parquet-api-design.md new file mode 100644 index 00000000000..58036667d44 --- /dev/null +++ b/docs/doris-iceberg-parquet-api-design.md @@ -0,0 +1,483 @@ +# Doris Iceberg + Parquet 新架构 API 设计 + +本文档用于描述 Doris 中 Iceberg + Parquet 新架构的 API 设计。本文档作为后续从 +`master` 新开重构分支时的起点,只定义 API 形状、职责边界、依赖方向和兼容原则, +不定义函数实现细节,不提供伪代码,不包含迁移 patch。 + +## 架构总览 + +目标架构包含 table 调度层、表格式语义层、schema 映射层、文件通用层和文件格式实现层: + +```text +FileScanner / split producer + -> +TableReader + -> +IcebergTableReader + -> +TableColumnMapper + FileReader + -> +ParquetReader +``` + +核心职责如下: + +- `TableReader` + 负责多文件、多 split 的上层调度,统一 scan 生命周期,对外输出 table block, + 并承接动态分区裁剪等 table-level 通用逻辑。 +- `IcebergTableReader` + 负责 Iceberg 表语义,包括 schema 绑定、scan task、delete file、虚拟列和 table + block finalize。 +- `TableColumnMapper` + 负责 table schema 到 file schema 的映射,负责 filter localization 和 schema + change 映射。 +- `FileReader` + 负责文件层通用读取接口,只理解 file-local schema 和 file-local scan request。 +- `ParquetReader` + 作为 `FileReader` 的 Parquet 实现,负责 Parquet 文件物理读取。 + +依赖方向必须保持单向: + +```text +TableReader + -> IcebergTableReader + -> TableColumnMapper + -> FileReader + -> ParquetReader +``` + +低层不反向理解高层语义,尤其 `ParquetReader` 不得反向理解 Iceberg/global schema。 + +## 核心 API 设计 + +### TableReader + +`TableReader` 是最上层读取接口,作为 `IcebergTableReader` 的基类,负责多 split / +多 file 调度,并承接 table-level 的通用裁剪逻辑,不下沉文件格式语义。 + +实际 API 文件: + +```text +be/src/format/reader/table_reader.h +``` + +实际命名空间: + +```cpp +namespace doris::reader +``` + +建议职责: + +- 接收 split 列表或 scan task 列表; +- 控制当前 reader 的创建、切换和关闭; +- 管理 scan 生命周期; +- 承接动态分区裁剪等 table-level 通用过滤逻辑; +- 对外统一输出 table block。 + +建议接口形状: + +```cpp +namespace doris::reader { + +class TableReader { +public: + virtual ~TableReader() = default; + + virtual Status init(const TableReadOptions& options); + virtual Status filter(const VExprContextSPtr& expr, bool* can_filter_all); + virtual Status next_reader(); + virtual Status next(Block* table_block, size_t* rows, bool* eof); + virtual Status close(); +}; + +} // namespace doris::reader +``` + +接口约束: + +- `TableReader` 输出的是 table block,不输出 file-local block。 +- `TableReader` 负责多文件编排和 table-level 通用裁剪,不负责 schema mapping,不负责 + Parquet 物理解码。 +- 动态分区裁剪这类逻辑应下放到 `TableReader`,而不是散落在具体表格式 reader 中。 +- `TableReader` 不直接依赖旧 `vparquet` 表层语义。 + +### IcebergTableReader + +`IcebergTableReader` 是 Iceberg 表语义层,负责把单个 Iceberg data file 的读取组织成 +table 语义输出。 + +实际 API 文件: + +```text +be/src/format/table/iceberg_reader_v2.h +``` + +实际命名空间: + +```cpp +namespace doris::iceberg +``` + +建议职责: + +- 绑定 Iceberg 当前 table schema; +- 接收 `IcebergScanTask`; +- 处理 position delete、equality delete、deletion vector; +- 物化 `_row_id`、`_last_updated_sequence_number` 等虚拟列; +- 将 `ParquetReader` 返回的 file-local block finalize 成 table block。 + +建议接口形状: + +```cpp +namespace doris::iceberg { + +class IcebergTableReader : public reader::TableReader { +public: + virtual ~IcebergTableReader() = default; + + Status init(const IcebergReadOptions& options, + std::unique_ptr<reader::FileReader> data_reader); + Status bind(const std::vector<reader::TableColumn>& iceberg_schema); + Status init(const reader::TableScanRequest& request); + Status open_task(const IcebergScanTask& task); + Status next(Block* table_block, size_t* rows, bool* eof) override; + Status close() override; +}; + +} // namespace doris::iceberg +``` + +接口约束: + +- `IcebergTableReader` 继承 `TableReader`,并通过组合使用 `FileReader`。 +- `IcebergTableReader` 不做 Parquet page/column 解码。 +- `IcebergTableReader` 负责 table-level finalize,不负责 file-local pruning 实现。 + +### TableColumnMapper + +`TableColumnMapper` 是 table schema 到 file schema 的通用映射层,不是 +Iceberg-only 组件。 + +实际 API 文件: + +```text +be/src/format/reader/table_reader.h +``` + +实际命名空间: + +```cpp +namespace doris::reader +``` + +建议职责: + +- 输入 table schema、file schema、table scan request; +- 输出 `ColumnMapping` 和通用 `FileScanRequest`; +- 负责 filter localization; +- 负责 schema change 映射; +- 负责复杂列 child mapping; +- 负责缺失列、default、partition、generated 列的 finalize 语义描述。 + +建议接口形状: + +```cpp +namespace doris::reader { + +class TableColumnMapper { +public: + explicit TableColumnMapper(TableColumnMapperOptions options = {}); + + virtual Status create_mapping(const std::vector<TableColumn>& table_schema, + const std::vector<SchemaField>& file_schema, + std::vector<ColumnMapping>* mappings); + + virtual Status create_scan_request(const TableScanRequest& table_request, + const std::vector<ColumnMapping>& mappings, + FileScanRequest* file_request); +}; + +} // namespace doris::reader +``` + +接口约束: + +- `TableColumnMapper` 的输入是 table schema + file schema + table scan request。 +- `TableColumnMapper` 的输出是 `ColumnMapping` + `FileScanRequest`。 +- `TableColumnMapper` 必须是通用层,不做 Iceberg-only 命名。 +- Iceberg 场景默认按 field id 映射;按 name 映射不是本轮默认路径。 + +### FileReader + +`FileReader` 是文件物理读取层的通用接口,为后续 Parquet 之外的文件格式适配预留。 + +实际 API 文件: + +```text +be/src/format/reader/file_reader.h +``` + +实际命名空间: + +```cpp +namespace doris::reader +``` + +建议职责: + +- 打开物理文件; +- 暴露 file-local schema; +- 接收 `FileScanRequest`; +- 输出 file-local block; +- 不理解 table/global schema。 + +建议接口形状: + +```cpp +namespace doris::reader { + +class FileReader { +public: + virtual ~FileReader() = default; + + virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = nullptr); + virtual Status get_schema(std::vector<SchemaField>* file_schema) const; + virtual Status init(const FileScanRequest& request); + virtual Status next(Block* file_block, size_t* rows, bool* eof); + virtual Status close(); +}; + +} // namespace doris::reader +``` + +接口约束: + +- `FileReader` 输出的是 file-local block,不输出 table/global schema block。 +- `FileReader` 不处理 Iceberg schema evolution、default/generated/partition 列。 +- `IcebergTableReader` 组合 `FileReader`,不直接绑定具体文件格式 reader。 + +### ParquetReader + +`ParquetReader` 是 `FileReader` 的 Parquet 实现,只负责 Parquet file-local schema +和 Parquet file-local scan request。 + +实际 API 文件: + +```text +be/src/format/parquet/parquet_reader.h +``` + +实际命名空间: + +```cpp +namespace doris::parquet +``` + +建议职责: + +- 打开 Parquet 文件; +- 解析 footer 和 file schema; +- 接收 `ParquetScanRequest` 或通用 `FileScanRequest`; +- 执行 file-local projection 和 file-local filter; +- 输出 file-local block。 + +建议接口形状: + +```cpp +namespace doris::parquet { + +class ParquetReader : public reader::FileReader { +public: + virtual ~ParquetReader() = default; + + virtual Status open(io::FileReaderSPtr file, io::IOContext* io_ctx = nullptr); + virtual Status get_schema(std::vector<reader::SchemaField>* file_schema) const; + virtual Status init(const ParquetScanRequest& request); + virtual Status next(Block* file_block, size_t* rows, bool* eof); + virtual Status close(); +}; + +} // namespace doris::parquet +``` + +接口约束: + +- `ParquetReader` 输出的是 file-local block,不输出 table/global schema block。 +- `ParquetReader` 不理解 Iceberg schema evolution。 +- `ParquetReader` 不负责 default/generated/partition 列。 +- 任何 table-level cast/default/generated/partition 语义都不能重新塞回 + `ParquetReader`。 + +## 关键类型 + +### SchemaField + +`SchemaField` 表示文件层 schema 中的列定义。 + +建议包含的信息: + +- file-local column id; +- 列名; +- 类型; +- child fields。 + +它服务于 `TableColumnMapper` 做 schema matching,不携带 table-level 语义。 + +### TableColumn + +`TableColumn` 表示 table/global schema 中的列定义。 + +建议包含的信息: + +- table column id; +- 列名; +- 类型; +- child columns。 + +Iceberg 场景下,column id 默认对应 field id。 + +### TableFilter + +`TableFilter` 表示 table 层过滤条件。 + +建议包含的信息: + +- `table_column_id` +- `conjunct` +- `predicates` + +职责约束: + +- `conjunct` 偏表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义; +- `predicates` 偏结构化单列下推,适合驱动 row group stats、page index、dictionary、 + bloom filter 等文件层优化。 + +### FileLocalFilter + +`FileLocalFilter` 表示已经 localize 到 file-local schema 的过滤条件。 + +建议包含的信息: + +- `file_column_id` +- `conjunct` +- `predicates` + +职责约束: + +- `conjunct` 用于 file-local 表达式过滤; +- `predicates` 用于 file-local 结构化下推; +- 其输入必须来自 `TableColumnMapper`,不能由具体文件 reader 自己推导 table 语义。 + +### ColumnMapping + +`ColumnMapping` 是 table schema 与 file schema 之间的核心边界对象。 + +建议包含的信息: + +- `table_column_id` +- `file_column_id` +- `file_type` +- `table_type` +- `finalize_expr` +- `reader_filter_expr` +- `child_mappings` + +职责约束: + +- `finalize_expr` 服务最终输出,把 file-local value 转成 table/global value; +- `reader_filter_expr` 服务读时 filter fallback; +- 二者语义不同,不能混用; +- `child_mappings` 用于复杂列 remap、复杂列裁剪和复杂列 schema change。 + +### TableScanRequest + +`TableScanRequest` 描述 table 层 scan 请求。 + +建议包含的信息: + +- projected table columns; +- table filters。 + +它由 `IcebergTableReader` 接收,再交给 `TableColumnMapper` 生成 file-local request。 + +### ParquetScanRequest + +`ParquetScanRequest` 继承 `FileScanRequest`,描述 Parquet file-local scan 请求。 + +### FileScanRequest + +`FileScanRequest` 描述通用 file-local scan 请求。 + +建议包含的信息: + +- projected file columns; +- local filters; +- reader expression map。 + +它是 `FileReader` 的唯一 scan 输入,不包含 table/global schema 语义。 + +### IcebergScanTask + +`IcebergScanTask` 表示一次 Iceberg data file 读取任务。 + +建议包含的信息: + +- data file 信息; +- position delete 文件; +- equality delete 文件; +- deletion vector 信息。 + +它是 `IcebergTableReader` 的输入,不应直接传给 `ParquetReader`。 + +## 设计原则 + +### 边界原则 + +- `FileReader` 不理解 global schema,不直接处理 Iceberg schema evolution。 +- `ParquetReader` 是 `FileReader` 的 Parquet 实现。 +- `TableColumnMapper` 是 schema mapping 和 filter localization 的唯一入口。 +- `IcebergTableReader` 不做 Parquet 解码,只负责 table-level finalize、delete、 + virtual columns。 +- `TableReader` 只负责多文件编排和 table-level 通用裁剪,不下沉文件格式语义。 +- 任何 table-level cast/default/generated/partition 语义都不能重新塞回 + `ParquetReader`。 + +### 依赖原则 + +- 低层不能反向依赖高层语义。 +- `FileReader` 只依赖 file-local request。 +- `IcebergTableReader` 继承 `TableReader`,复用其多文件编排和通用裁剪能力。 +- `IcebergTableReader` 通过组合使用 `FileReader`。 +- `TableColumnMapper` 可以被 Iceberg 之外的其他表格式复用。 + +### 命名原则 + +- 表层抽象使用 `TableReader`、`IcebergTableReader`、`TableColumnMapper`、 + `FileReader`、`ParquetReader` 命名。 +- `TableColumnMapper` 不使用 Iceberg-only 命名。 +- file schema 类型使用 `SchemaField`,table schema 类型使用 `TableColumn`。 + +## 兼容原则 + +新架构重构期间,新旧代码允许并存,但必须遵守以下约束: + +- 旧 `vparquet` / Hive / Hudi / Paimon 路径在新架构稳定前允许保留。 +- 新架构实现不得继续向旧 `vparquet` 表层语义回灌依赖。 +- 先搭新框架 API,再逐步迁移调用点。 +- 不允许边改 API 边混入临时裸逻辑、实验性草稿或未收敛命名。 +- 兼容层可能需要存在,但本文档不定义兼容层的具体实现方案。 + +## 验收标准 + +该文档应满足以下目标: + +- 不引用错误实验代码作为既成事实; +- 不出现实现性草稿、裸伪代码、未收敛命名混用; +- 让另一个工程师从 `master` 新开分支时,可以直接按本文档搭 API 骨架; +- 读完文档后,不需要再讨论以下问题: + - 新架构分几层; + - 每层负责什么; + - 哪层理解 global schema; + - 哪层做 schema change / filter localization / finalize; + - 哪层允许依赖旧实现,哪层不允许。 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
