This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new 1676f2ef64f fix compiling (#63368)
1676f2ef64f is described below
commit 1676f2ef64fb4651a15d6e8df8c0b2e06bcb15e8
Author: Gabriel <[email protected]>
AuthorDate: Mon May 18 20:22:03 2026 +0800
fix compiling (#63368)
---
be/src/format/reader/file_reader.h | 6 +-
be/src/format/reader/table_reader.h | 167 ++++++++++++++++++------------
be/src/format/table/iceberg_reader_v2.cpp | 20 ++++
be/src/format/table/iceberg_reader_v2.h | 150 +++------------------------
4 files changed, 141 insertions(+), 202 deletions(-)
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index fd5bfcf933f..e9fb0f3c963 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -122,12 +122,8 @@ public:
// 读取下一批 file-local block。
// file_block 的列顺序和类型必须遵守 FileScanRequest,而不是 table/global schema。
// eof 表示当前文件 reader 是否读完;多文件切换由 TableReader 负责。
- virtual Status next(Block* file_block, size_t* rows, bool* eof) {
+ virtual Status get_block(Block* file_block, bool* eof) {
// stub 默认立即 EOF。
- (void)file_block;
- if (rows != nullptr) {
- *rows = 0;
- }
if (eof != nullptr) {
*eof = true;
}
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 8d88ce4fe1b..99dcc507e5d 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -17,6 +17,8 @@
#pragma once
+#include <bvar/status.h>
+
#include <cstddef>
#include <cstdint>
#include <memory>
@@ -26,6 +28,7 @@
#include <vector>
#include "common/status.h"
+#include "core/block/block.h"
#include "core/data_type/data_type.h"
#include "exprs/vexpr_fwd.h"
#include "format/reader/file_reader.h"
@@ -110,7 +113,8 @@ struct TableScanRequest {
// Iceberg-only 组件。
class TableColumnMapper {
public:
- explicit TableColumnMapper(TableColumnMapperOptions options = {}) :
_options(std::move(options)) {}
+ explicit TableColumnMapper(TableColumnMapperOptions options = {})
+ : _options(std::move(options)) {}
virtual ~TableColumnMapper() = default;
// 建立 table schema 到 file schema 的列映射。
@@ -184,14 +188,15 @@ public:
const std::vector<ColumnMapping>& mappings() const { return _mappings; }
private:
- const SchemaField* find_file_field(
- const TableColumn& table_column,
- const std::vector<SchemaField>& file_schema) const {
+ const SchemaField* find_file_field(const TableColumn& table_column,
+ const std::vector<SchemaField>&
file_schema) const {
for (const auto& field : file_schema) {
- if (_options.mode == TableColumnMappingMode::BY_FIELD_ID &&
field.id == table_column.id) {
+ if (_options.mode == TableColumnMappingMode::BY_FIELD_ID &&
+ field.id == table_column.id) {
return &field;
}
- if (_options.mode == TableColumnMappingMode::BY_NAME && field.name
== table_column.name) {
+ if (_options.mode == TableColumnMappingMode::BY_NAME &&
+ field.name == table_column.name) {
return &field;
}
}
@@ -216,8 +221,28 @@ private:
std::vector<ColumnMapping> _mappings;
};
+struct BaseDataFile {
+ virtual ~BaseDataFile() = default;
+
+ std::string path;
+ std::string format;
+ int64_t record_count = 0;
+ int64_t file_size = 0;
+};
+
+struct ScanTask {
+ virtual ~ScanTask() = default;
+
+ std::unique_ptr<BaseDataFile> data_file;
+};
+
struct TableReadOptions {
size_t batch_size = 4096;
+ // TODO: deleted? SCHEMA should be derived from table metadata and inited
by TableReader it self? it shouldn't be part of read options.
+ std::vector<TableColumn> schema;
+ TableScanRequest scan_request;
+ // Each task denotes a descriptor of a single file to read, along with
file-level metadata such as stats and delete files.
+ std::vector<std::unique_ptr<ScanTask>> scan_tasks;
};
// table-level reader 基类。
@@ -228,9 +253,12 @@ public:
virtual ~TableReader() = default;
// 初始化 table reader 的通用运行参数。
- // 子类可以在自己的 init(params) 中调用该方法;这里不接收具体表格式 schema/task。
- virtual Status init(const TableReadOptions& options) {
- _options = options;
+ // 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
+ virtual Status init(TableReadOptions options) {
+ _schema = std::move(_options.schema);
+ _table_scan_request = std::move(_options.scan_request);
+ _scan_tasks = std::move(_options.scan_tasks);
+ _next_task_idx = 0;
return Status::OK();
}
@@ -249,91 +277,102 @@ public:
// 对外读取 table block 的统一入口。
// 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
// table_block 的列必须已经是 table/global schema 语义。
- Status next(Block* table_block, size_t* rows, bool* eof) {
- if (rows != nullptr) {
- *rows = 0;
- }
- if (eof != nullptr) {
- *eof = false;
+ Status get_block(Block* block, bool* eos) {
+ if (eos != nullptr) {
+ *eos = false;
}
- while (true) {
- if (!_has_current_reader) {
- RETURN_IF_ERROR(next_reader());
- if (!_has_current_reader) {
- if (eof != nullptr) {
- *eof = true;
- }
+ while (block->empty() && !*eos) {
+ if (!_data_reader) {
+ RETURN_IF_ERROR(create_next_reader(eos));
+ if (!_data_reader) {
+ DCHECK(*eos);
return Status::OK();
}
}
- size_t current_rows = 0;
bool current_eof = false;
- RETURN_IF_ERROR(read_current(table_block, ¤t_rows,
¤t_eof));
- if (rows != nullptr) {
- *rows = current_rows;
- }
- if (!current_eof || current_rows > 0) {
- return Status::OK();
+ RETURN_IF_ERROR(_data_reader->get_block(block, ¤t_eof));
+ RETURN_IF_ERROR(finalize_chunk(block));
+ RETURN_IF_ERROR(materialize_virtual_columns(block));
+ if (current_eof) {
+ RETURN_IF_ERROR(close_current_reader());
}
- RETURN_IF_ERROR(close_current_reader());
- _has_current_reader = false;
}
+ return Status::OK();
}
// 关闭 table reader 及当前正在读取的底层 reader。
// 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。
virtual Status close() {
- RETURN_IF_ERROR(close_current_reader());
- _has_current_reader = false;
+ if (_data_reader) {
+ RETURN_IF_ERROR(close_current_reader());
+ }
return Status::OK();
}
protected:
// 切换到下一个 reader 的通用流程。
- // 该方法先关闭当前 reader,再调用 open_next_reader;子类不应重复实现这个循环。
- Status next_reader() {
+ // 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
+ Status create_next_reader(bool* eos) {
// 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
- // 子类只通过 open_next_reader 提供具体表格式的 task/split 打开方式。
- RETURN_IF_ERROR(close_current_reader());
- bool has_reader = false;
- RETURN_IF_ERROR(open_next_reader(&has_reader));
- _has_current_reader = has_reader;
+ DCHECK(_data_reader == nullptr);
+ // TODO: 创建_data_reader
+ // _data_reader = std::make_unique<FileReader>(...);
+ if (!_data_reader) {
+ if (eos != nullptr) {
+ *eos = true;
+ }
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(open_reader());
return Status::OK();
}
- // 打开下一个具体 reader。
- // 子类在这里选择下一个 split/task,创建或重置底层 FileReader,并设置 has_reader。
- // has_reader=false 表示没有更多输入,TableReader::next 会返回 eof=true。
- virtual Status open_next_reader(bool* has_reader) {
- // stub 默认没有下一个 reader。
- if (has_reader != nullptr) {
- *has_reader = false;
- }
+ // 打开当前具体 reader。
+ // 子类在这里基于当前 split/task 初始化底层 FileReader。
+ virtual Status open_reader() {
+ std::vector<SchemaField> file_schema;
+ RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
+ TableColumnMapperOptions mapper_options;
+ mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+ _column_mapper = TableColumnMapper(mapper_options);
+ RETURN_IF_ERROR(_column_mapper.create_mapping(_schema, file_schema,
&_mappings));
+
+ FileScanRequest file_request;
+ RETURN_IF_ERROR(
+ _column_mapper.create_scan_request(_table_scan_request,
_mappings, &file_request));
+ RETURN_IF_ERROR(_data_reader->init(file_request));
return Status::OK();
}
- // 从当前 reader 读取一批 table block。
- // 子类应在这里读取 file-local block,并完成 delete、virtual column、finalize_expr
- // 等 table-level 处理,最终写入 table_block。
- virtual Status read_current(Block* table_block, size_t* rows, bool* eof) {
- // stub 默认当前 reader 立即 EOF。
- (void)table_block;
- if (rows != nullptr) {
- *rows = 0;
- }
- if (eof != nullptr) {
- *eof = true;
- }
+ // 关闭当前具体 reader。
+ // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
+ virtual Status close_current_reader() {
+ RETURN_IF_ERROR(_data_reader->close());
+ _data_reader.reset();
return Status::OK();
}
- // 关闭当前具体 reader。
- // 该 hook 会被 next_reader 和 close 调用;实现应保持幂等。
- virtual Status close_current_reader() { return Status::OK(); }
+ // 将 file-local block 转换为 table/global schema block。
+ // 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
+ // 物化以及复杂列 remap。
+ virtual Status finalize_chunk(Block* block) { return Status::OK(); }
+
+ // 物化虚拟列。
+ // 例如 _row_id、_last_updated_sequence_number 等,它们不来自文件物理列。
+ virtual Status materialize_virtual_columns(Block* table_block) {
+ // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
+ return Status::OK();
+ }
TableReadOptions _options;
- bool _has_current_reader = false;
+ std::unique_ptr<FileReader> _data_reader;
+ std::vector<std::unique_ptr<ScanTask>> _scan_tasks;
+ TableScanRequest _table_scan_request;
+ std::vector<TableColumn> _schema;
+ std::vector<ColumnMapping> _mappings;
+ TableColumnMapper _column_mapper;
+ size_t _next_task_idx = 0;
};
} // namespace doris::reader
diff --git a/be/src/format/table/iceberg_reader_v2.cpp
b/be/src/format/table/iceberg_reader_v2.cpp
new file mode 100644
index 00000000000..220f153e93f
--- /dev/null
+++ b/be/src/format/table/iceberg_reader_v2.cpp
@@ -0,0 +1,20 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/table/iceberg_reader_v2.h"
+
+namespace doris::iceberg {} // namespace doris::iceberg
diff --git a/be/src/format/table/iceberg_reader_v2.h
b/be/src/format/table/iceberg_reader_v2.h
index 29b556f71ed..3ddadc9f9de 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -35,51 +35,26 @@ class Block;
namespace doris::iceberg {
// Iceberg data file 摘要。它描述当前要读取的物理 data file,不承载列映射逻辑。
-struct IcebergDataFile {
- std::string path;
- std::string format;
- int64_t record_count = 0;
- int64_t file_size = 0;
+struct IcebergDataFile final : public reader::BaseDataFile {
int64_t sequence_number = 0;
int64_t first_row_id = -1;
};
// Iceberg delete file 摘要。position/equality/deletion vector 的具体读取在
// IcebergTableReader 实现阶段补齐。
-struct IcebergDeleteFile {
- std::string path;
- std::string format;
+struct IcebergDeleteFile final : public reader::BaseDataFile {
int64_t sequence_number = 0;
std::vector<reader::ColumnId> equality_field_ids;
};
// 单个 Iceberg data file 的 scan 输入。
// 该结构只进入 IcebergTableReader,不直接传给 ParquetReader。
-struct IcebergScanTask {
- IcebergDataFile data_file;
+struct IcebergScanTask final : public reader::ScanTask {
std::vector<IcebergDeleteFile> positional_deletes;
std::vector<IcebergDeleteFile> equality_deletes;
std::vector<IcebergDeleteFile> deletion_vectors;
};
-struct IcebergReadOptions {
- reader::TableReadOptions table_options;
- bool enable_position_delete = true;
- bool enable_equality_delete = true;
- bool enable_deletion_vector = true;
-};
-
-// IcebergTableReader 的完整初始化输入。
-// 这些字段共同决定一次 table scan 的语义,除非后续有明确的生命周期差异,否则不拆成
-// bind/init/set_tasks 多个阶段,避免调用点暴露半初始化状态。
-struct IcebergTableReadParams {
- IcebergReadOptions options;
- std::vector<reader::TableColumn> iceberg_schema;
- reader::TableScanRequest scan_request;
- std::vector<IcebergScanTask> scan_tasks;
- std::unique_ptr<reader::FileReader> data_reader;
-};
-
// Iceberg table-level reader。
// 该层继承 TableReader,复用多文件编排和动态分区裁剪等通用能力;同时组合
// FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
@@ -90,18 +65,12 @@ public:
~IcebergTableReader() override = default;
// 初始化一次 Iceberg table scan。
- // params 必须一次性提供 schema、projection/filter、scan tasks 和底层 FileReader;
- // 这样 IcebergTableReader 不会暴露 bind/set_tasks 等半初始化阶段。
- Status init(IcebergTableReadParams params) {
+ // options 必须一次性提供 schema、projection/filter 和 scan tasks,避免暴露
+ // bind/set_tasks 等半初始化阶段。
+ Status init(reader::TableReadOptions options) override {
// 一次性保存 Iceberg table scan 所需输入。TableReader 负责 reader 切换流程;
// IcebergTableReader 只提供后续要打开的 task 以及 table/file schema 映射语义。
- _iceberg_options = params.options;
- _iceberg_schema = std::move(params.iceberg_schema);
- _table_scan_request = std::move(params.scan_request);
- _scan_tasks = std::move(params.scan_tasks);
- _data_reader = std::move(params.data_reader);
- _next_task_idx = 0;
- return reader::TableReader::init(_iceberg_options.table_options);
+ return reader::TableReader::init(std::move(options));
}
// 关闭当前 Iceberg scan。
@@ -114,75 +83,20 @@ public:
}
protected:
- // 打开单个 Iceberg scan task。
- // 该方法完成当前 data file 的 schema mapping、filter localization、position delete
- // 注入,并初始化底层 FileReader;它由 TableReader 的 reader 切换流程调用。
- Status open_task(const IcebergScanTask& task) {
- // 真实实现会读取 data file schema,创建 field-id mapping,应用 position deletes,
- // 并初始化底层 ParquetReader。
- _scan_task = task;
- std::vector<reader::SchemaField> file_schema;
- if (_data_reader) {
- RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
- }
- reader::TableColumnMapperOptions mapper_options;
- mapper_options.mode = reader::TableColumnMappingMode::BY_FIELD_ID;
- _column_mapper = reader::TableColumnMapper(mapper_options);
- RETURN_IF_ERROR(_column_mapper.create_mapping(_iceberg_schema,
file_schema, &_mappings));
-
- reader::FileScanRequest file_request;
-
RETURN_IF_ERROR(_column_mapper.create_scan_request(_table_scan_request,
_mappings,
- &file_request));
- RETURN_IF_ERROR(apply_position_deletes(&file_request));
- if (_data_reader) {
- RETURN_IF_ERROR(_data_reader->init(file_request));
- }
- return Status::OK();
- }
-
- // 打开下一个 Iceberg task。
- // TableReader 负责循环和 EOF 处理;这里仅从 _scan_tasks 中取下一个 task 并调用
- // open_task。
- Status open_next_reader(bool* has_reader) override {
- if (_next_task_idx >= _scan_tasks.size()) {
- if (has_reader != nullptr) {
- *has_reader = false;
- }
- return Status::OK();
- }
- RETURN_IF_ERROR(open_task(_scan_tasks[_next_task_idx++]));
- if (has_reader != nullptr) {
- *has_reader = true;
- }
- return Status::OK();
- }
-
- // 读取当前 Iceberg task 的下一批 table block。
- // 这里组合底层 FileReader 输出的 file-local block,并负责 equality delete、
- // virtual columns 和 finalize,最终输出 table/global schema block。
- Status read_current(Block* table_block, size_t* rows, bool* eof) override {
- // 真实实现会读取 file-local block,finalize 成 table block,再应用 equality delete
- // 和 Iceberg virtual columns。stub 默认 EOF。
- // 后续实现应在 IcebergTableReader 内部持有 file-local block;这里仅复用输出指针
- // 作为 header-only API 占位,避免在骨架阶段引入 Block 的完整定义。
- Block* file_block = table_block;
- if (_data_reader) {
- RETURN_IF_ERROR(_data_reader->next(file_block, rows, eof));
- }
- RETURN_IF_ERROR(finalize_chunk(file_block, table_block));
- RETURN_IF_ERROR(apply_equality_deletes(table_block));
- RETURN_IF_ERROR(materialize_virtual_columns(table_block, rows !=
nullptr ? *rows : 0));
- return Status::OK();
- }
-
// 将 file-local block 转换为 table/global schema block。
// 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
// 物化以及复杂列 remap。
- Status finalize_chunk(Block* file_block, Block* table_block) {
+ Status finalize_chunk(Block* block) override {
// 真实实现会根据 ColumnMapping 执行 finalize_expr/default/partition/generated
// expressions,把 file-local block 写成 table block。
- (void)file_block;
- (void)table_block;
+ RETURN_IF_ERROR(apply_equality_deletes(block));
+ return Status::OK();
+ }
+
+ // 物化 Iceberg 虚拟列。
+ // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
+ Status materialize_virtual_columns(Block* table_block) override {
+ // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
return Status::OK();
}
@@ -196,40 +110,10 @@ protected:
// 在 table block 上应用 equality delete。
// equality delete 依赖 table-level 列语义,因此不能下沉到 ParquetReader。
- Status apply_equality_deletes(Block* table_block) {
+ Status apply_equality_deletes(Block* block) {
// 真实实现会在 table block 上应用 equality delete。
- (void)table_block;
- return Status::OK();
- }
-
- // 物化 Iceberg 虚拟列。
- // 例如 _row_id、_last_updated_sequence_number 等,它们不来自 Parquet 文件物理列。
- Status materialize_virtual_columns(Block* table_block, size_t rows) {
- // 真实实现会物化 _row_id、_last_updated_sequence_number 等 Iceberg 虚拟列。
- (void)table_block;
- (void)rows;
return Status::OK();
}
-
- // 关闭当前 task 对应的底层 FileReader。
- // 该方法由 TableReader 在切换 reader 或 close 时调用,要求可重复调用。
- Status close_current_reader() override {
- if (_data_reader) {
- RETURN_IF_ERROR(_data_reader->close());
- }
- return Status::OK();
- }
-
-private:
- IcebergReadOptions _iceberg_options;
- IcebergScanTask _scan_task;
- std::vector<IcebergScanTask> _scan_tasks;
- size_t _next_task_idx = 0;
- reader::TableScanRequest _table_scan_request;
- std::vector<reader::TableColumn> _iceberg_schema;
- std::vector<reader::ColumnMapping> _mappings;
- reader::TableColumnMapper _column_mapper;
- std::unique_ptr<reader::FileReader> _data_reader;
};
} // namespace doris::iceberg
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]