This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new e5d17b881ba refactor table reader (#63397)
e5d17b881ba is described below
commit e5d17b881ba4049948a9370db285eaf3f280ed38
Author: Gabriel <[email protected]>
AuthorDate: Tue May 19 16:32:59 2026 +0800
refactor table reader (#63397)
---
be/src/exec/scan/file_scanner.cpp | 1 -
be/src/exec/scan/file_scanner.h | 4 +-
be/src/exprs/vliteral.cpp | 6 -
be/src/exprs/vliteral.h | 8 +-
be/src/exprs/vslot_ref.h | 4 +
be/src/format/reader/column_mapper.cpp | 137 ++++++++++++++++
be/src/format/reader/column_mapper.h | 124 ++++++++++++++
be/src/format/reader/expr/literal.h | 35 ++++
be/src/format/reader/expr/slot_ref.h | 39 +++++
be/src/format/reader/file_reader.h | 27 ++-
be/src/format/reader/table_reader.h | 255 ++++++++---------------------
be/src/format/table/iceberg_reader_mixin.h | 3 -
be/src/format/table/iceberg_reader_v2.h | 20 ---
13 files changed, 440 insertions(+), 223 deletions(-)
diff --git a/be/src/exec/scan/file_scanner.cpp
b/be/src/exec/scan/file_scanner.cpp
index 5f1d248c1e1..0ba7266456e 100644
--- a/be/src/exec/scan/file_scanner.cpp
+++ b/be/src/exec/scan/file_scanner.cpp
@@ -1791,7 +1791,6 @@ Status FileScanner::_init_expr_ctxes() {
if (is_file_slot) {
_is_file_slot.emplace(slot_id);
_file_slot_descs.emplace_back(it->second);
- _file_col_names.push_back(it->second->col_name());
}
_column_descs.push_back(col_desc);
diff --git a/be/src/exec/scan/file_scanner.h b/be/src/exec/scan/file_scanner.h
index cd4066ec987..7c3d9d08b6a 100644
--- a/be/src/exec/scan/file_scanner.h
+++ b/be/src/exec/scan/file_scanner.h
@@ -133,8 +133,6 @@ protected:
bool _cur_reader_eof = false;
// File source slot descriptors
std::vector<SlotDescriptor*> _file_slot_descs;
- // col names from _file_slot_descs
- std::vector<std::string> _file_col_names;
// Unified column descriptors for init_reader (includes file, partition,
missing, synthesized cols)
std::vector<ColumnDescriptor> _column_descs;
@@ -147,6 +145,7 @@ protected:
// dest slot name to index in _dest_vexpr_ctx;
std::unordered_map<std::string, int> _dest_slot_name_to_idx;
// col name to default value expr
+ // TODO: only used by json reader. Could we delete this?
std::unordered_map<std::string, VExprContextSPtr> _col_default_value_ctx;
// the map values of dest slot id to src slot desc
// if there is not key of dest slot id in
dest_sid_to_src_sid_without_trans, it will be set to nullptr
@@ -193,7 +192,6 @@ protected:
std::unique_ptr<io::IOContext> _io_ctx;
// Whether to fill partition columns from path, default is true.
- bool _fill_partition_from_path = true;
std::unordered_map<std::string, std::tuple<std::string, const
SlotDescriptor*>>
_partition_col_descs;
std::unordered_map<std::string, bool> _partition_value_is_null;
diff --git a/be/src/exprs/vliteral.cpp b/be/src/exprs/vliteral.cpp
index 551839f699e..9b93d709727 100644
--- a/be/src/exprs/vliteral.cpp
+++ b/be/src/exprs/vliteral.cpp
@@ -37,12 +37,6 @@ namespace doris {
class VExprContext;
-void VLiteral::init(const TExprNode& node) {
- Field field;
- field = _data_type->get_field(node);
- _column_ptr = _data_type->create_column_const(1, field);
-}
-
Status VLiteral::prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) {
RETURN_IF_ERROR_OR_PREPARED(VExpr::prepare(state, desc, context));
return Status::OK();
diff --git a/be/src/exprs/vliteral.h b/be/src/exprs/vliteral.h
index b1b8e89157d..e5a4c7a5f3d 100644
--- a/be/src/exprs/vliteral.h
+++ b/be/src/exprs/vliteral.h
@@ -39,7 +39,9 @@ public:
VLiteral(const TExprNode& node, bool should_init = true)
: VExpr(node), _expr_name(_data_type->get_name()) {
if (should_init) {
- init(node);
+ Field field;
+ field = _data_type->get_field(node);
+ _column_ptr = _data_type->create_column_const(1, field);
}
}
@@ -69,11 +71,9 @@ public:
uint64_t get_digest(uint64_t seed) const override;
protected:
+ VLiteral(const DataTypePtr& type) : VExpr(type, false) {}
ColumnPtr _column_ptr;
std::string _expr_name;
-
-private:
- void init(const TExprNode& node);
};
} // namespace doris
diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 21b5735753b..3ac9f641c19 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -73,6 +73,10 @@ public:
double execute_cost() const override { return 0.0; }
+protected:
+ VSlotRef(int slot_id, int column_id, int column_uniq_id)
+ : _slot_id(slot_id), _column_id(column_id),
_column_uniq_id(column_uniq_id) {}
+
private:
int _slot_id;
int _column_id;
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
new file mode 100644
index 00000000000..7006365b054
--- /dev/null
+++ b/be/src/format/reader/column_mapper.cpp
@@ -0,0 +1,137 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "format/reader/column_mapper.h"
+
+#include <vector>
+
+#include "common/status.h"
+#include "expr/slot_ref.h"
+#include "format/reader/file_reader.h"
+#include "format/reader/table_reader.h"
+
+namespace doris::reader {
+
+Status TableColumnMapper::create_mapping(const std::vector<TableColumn>&
projected_columns,
+ std::vector<SchemaField> block_schema,
+ const std::map<std::string, Field>&
partition_values,
+ const std::vector<SchemaField>&
file_schema) {
+ // 真实实现会做 field id/name matching、类型转换、复杂列 child mapping、缺失列
+ // default/partition/generated 表达式构造。
+ _mappings.clear();
+ block_schema.clear();
+ for (const auto& table_column : projected_columns) {
+ ColumnMapping mapping;
+ mapping.table_column_id = table_column.id;
+ mapping.table_type = table_column.type;
+ if (const auto* file_field = _find_file_field(table_column,
file_schema)) {
+ mapping.file_column_id = file_field->id;
+ mapping.file_type = file_field->type;
+ mapping.is_trivial = _is_same_type(mapping.table_type,
mapping.file_type);
+ if (!mapping.is_trivial) {
+ // TODO:
+ return Status::NotSupported(
+ "column mapping with type conversion is not supported
yet: table column "
+ "'{}' (id={}, type={}) vs file column (id={},
type={})",
+ table_column.name, mapping.table_column_id,
mapping.table_type->get_name(),
+ mapping.file_column_id.value(),
mapping.file_type->get_name());
+ } else {
+ mapping.projection =
VExprContext::create_shared(TableSlotRef::create_shared(
+ *mapping.file_column_id, block_schema.size(), -1,
mapping.table_type));
+ }
+ block_schema.push_back(SchemaField {
+ mapping.file_column_id.value(), table_column.name,
mapping.table_type, {}});
+ } else if (table_column.default_expr != nullptr) {
+ mapping.is_constant = true;
+ mapping.default_expr = table_column.default_expr;
+ } else if (table_column.is_partition_key &&
partition_values.count(table_column.name) > 0) {
+ mapping.default_expr =
VExprContext::create_shared(TableLiteral::create_shared(
+ mapping.table_type,
partition_values.at(table_column.name)));
+ } else {
+ if (table_column.is_partition_key) {
+ return Status::InvalidArgument(
+ "Table column '%s' (id=%d) does not have a matching
partition value",
+ table_column.name);
+ }
+ if (!_options.allow_missing_columns) {
+ return Status::InvalidArgument(
+ "Table column '%s' (id=%d) does not have a matching
file column",
+ table_column.name, table_column.id);
+ }
+ }
+ _mappings.push_back(std::move(mapping));
+ }
+ return Status::OK();
+}
+
+Status TableColumnMapper::create_scan_request(const std::map<int32_t,
TableFilter>& table_filters,
+ const std::vector<TableColumn>&
projected_columns,
+ FileScanRequest* file_request) {
+ // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
+ file_request->predicate_columns.clear();
+ file_request->non_predicate_columns.clear();
+ file_request->local_filters.clear();
+ file_request->reader_expression_map.clear();
+ for (const auto& table_column : projected_columns) {
+ const auto* mapping = _find_mapping(table_column.id);
+ if (mapping != nullptr && mapping->file_column_id.has_value() &&
+ table_filters.count(table_column.id) == 0) {
+
file_request->non_predicate_columns.push_back(*mapping->file_column_id);
+ }
+ }
+ RETURN_IF_ERROR(localize_filters(table_filters, file_request));
+ return Status::OK();
+}
+
+Status TableColumnMapper::localize_filters(const std::map<int32_t,
TableFilter>& table_filters,
+ FileScanRequest* file_request)
const {
+ // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
+ // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
+ for (const auto& it : table_filters) {
+ const auto* mapping = _find_mapping(it.first);
+ if (mapping == nullptr || !mapping->file_column_id.has_value()) {
+ continue;
+ }
+ if (!it.second.can_be_localized()) {
+ // TODO: Rewrite table filter to reader_expression_map
+ //
file_request->reader_expression_map.emplace_back(mapping->table_column_id,
it.second.conjunct);
+ } else {
+ FileLocalFilter local_filter;
+ local_filter.file_column_id = *mapping->file_column_id;
+ local_filter.conjunct = it.second.conjunct;
+ local_filter.predicates = it.second.predicates;
+ file_request->local_filters.push_back(std::move(local_filter));
+ }
+ file_request->predicate_columns.push_back(*mapping->file_column_id);
+ }
+ return Status::OK();
+}
+
+const SchemaField* TableColumnMapper::_find_file_field(
+ const TableColumn& table_column, const std::vector<SchemaField>&
file_schema) const {
+ for (const auto& field : file_schema) {
+ if (_options.mode == TableColumnMappingMode::BY_FIELD_ID && field.id
== table_column.id) {
+ return &field;
+ }
+ if (field.name == table_column.name) {
+ return &field;
+ }
+ }
+ return nullptr;
+}
+
+} // namespace doris::reader
diff --git a/be/src/format/reader/column_mapper.h
b/be/src/format/reader/column_mapper.h
new file mode 100644
index 00000000000..4c6b510ff0e
--- /dev/null
+++ b/be/src/format/reader/column_mapper.h
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "core/data_type/data_type.h"
+#include "exprs/vexpr_fwd.h"
+#include "format/reader/expr/literal.h"
+namespace doris::reader {
+
+struct TableColumn;
+struct TableFilter;
+struct SchemaField;
+struct FileScanRequest;
+
+enum class TableColumnMappingMode {
+ BY_FIELD_ID,
+ BY_NAME,
+};
+
+// 单个 table column 到 file column 的映射结果。
+// 这是 table 层和 file 层的核心边界对象。
+struct ColumnMapping {
+ int32_t table_column_id = -1;
+ std::optional<int32_t> file_column_id;
+ DataTypePtr file_type;
+ DataTypePtr table_type;
+
+ // 最终输出表达式。用于把 file-local value 转成 table/global value,例如 cast、
+ // default、partition、generated column 或复杂列 remap。
+ VExprContextSPtr projection;
+
+ // 读时过滤 fallback 表达式。只在 table filter 不能安全转换成 file-local predicate
+ // 时使用,服务 reader_expression_map,不等价于 finalize_expr。
+ VExprContextSPtr reader_filter_expr;
+
+ std::vector<ColumnMapping> child_mappings;
+ bool is_trivial = false;
+ bool is_constant = false;
+ VExprContextSPtr default_expr;
+};
+
+struct TableColumnMapperOptions {
+ TableColumnMappingMode mode = TableColumnMappingMode::BY_FIELD_ID;
+ bool allow_missing_columns = true;
+ bool enable_reader_expression_fallback = true;
+};
+
+// 通用 table schema 到 file schema 映射层。
+// Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成
+// Iceberg-only 组件。
+class TableColumnMapper {
+public:
+ explicit TableColumnMapper(TableColumnMapperOptions options = {})
+ : _options(std::move(options)) {}
+ virtual ~TableColumnMapper() = default;
+
+ // 建立 table schema 到 file schema 的列映射。
+ // 输出的 ColumnMapping 描述 table column 如何从 file column、常量列或表达式得到;
+ // 后续 projection、filter localization 和 table block finalize 都应复用这份映射。
+ virtual Status create_mapping(const std::vector<TableColumn>&
projected_columns,
+ std::vector<SchemaField> block_schema,
+ const std::map<std::string, Field>&
partition_values,
+ const std::vector<SchemaField>& file_schema);
+
+ // 把 table-level scan 请求转换成 file-local scan 请求。
+ // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
+ // projected_file_columns、local_filters 和 reader_expression_map。
+ virtual Status create_scan_request(const std::map<int32_t, TableFilter>&
table_filters,
+ const std::vector<TableColumn>&
projected_columns,
+ FileScanRequest* file_request);
+
+ // 将 table-level filter 定位到文件 schema。
+ // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
+ // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback
处理。
+ virtual Status localize_filters(const std::map<int32_t, TableFilter>&
table_filters,
+ FileScanRequest* file_request) const;
+ void clear() { _mappings.clear(); }
+ const std::vector<ColumnMapping>& mappings() const { return _mappings; }
+
+private:
+ const SchemaField* _find_file_field(const TableColumn& table_column,
+ const std::vector<SchemaField>&
file_schema) const;
+
+ const ColumnMapping* _find_mapping(ColumnId table_column_id) const {
+ for (const auto& mapping : _mappings) {
+ if (mapping.table_column_id == table_column_id) {
+ return &mapping;
+ }
+ }
+ return nullptr;
+ }
+
+ bool _is_same_type(const DataTypePtr& table_type, const DataTypePtr&
file_type) const {
+ return table_type == file_type;
+ }
+
+ TableColumnMapperOptions _options;
+ std::vector<ColumnMapping> _mappings;
+};
+
+} // namespace doris::reader
diff --git a/be/src/format/reader/expr/literal.h
b/be/src/format/reader/expr/literal.h
new file mode 100644
index 00000000000..9c4202994ee
--- /dev/null
+++ b/be/src/format/reader/expr/literal.h
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "core/data_type/data_type.h"
+#include "exprs/vliteral.h"
+
+namespace doris {
+
+class TableLiteral : public VLiteral {
+ ENABLE_FACTORY_CREATOR(TableLiteral);
+
+public:
+ TableLiteral(const DataTypePtr& type, const Field& field) : VLiteral(type)
{
+ _data_type = type;
+ _column_ptr = _data_type->create_column_const(1, field);
+ }
+};
+
+} // namespace doris
diff --git a/be/src/format/reader/expr/slot_ref.h
b/be/src/format/reader/expr/slot_ref.h
new file mode 100644
index 00000000000..6b5d027602e
--- /dev/null
+++ b/be/src/format/reader/expr/slot_ref.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "core/data_type/data_type.h"
+#include "exprs/vslot_ref.h"
+
+namespace doris {
+
+class TableSlotRef : public VSlotRef {
+ ENABLE_FACTORY_CREATOR(TableSlotRef);
+
+public:
+ TableSlotRef(int slot_id, int column_id, int column_uniq_id, const
DataTypePtr& type)
+ : VSlotRef(slot_id, column_id, column_uniq_id) {
+ _data_type = type;
+ }
+
+ Status prepare(RuntimeState* state, const RowDescriptor& desc,
VExprContext* context) override {
+ return Status::OK();
+ }
+};
+
+} // namespace doris
diff --git a/be/src/format/reader/file_reader.h
b/be/src/format/reader/file_reader.h
index e9fb0f3c963..edebdcaff42 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -47,7 +47,7 @@ using ColumnId = int32_t;
// schema 语义。Iceberg field id、name mapping、default/generated/partition 列都不在
// FileReader 内部解释。
struct SchemaField {
- ColumnId id = -1;
+ int32_t id = -1;
std::string name;
DataTypePtr type;
std::vector<SchemaField> children;
@@ -65,9 +65,16 @@ struct FileLocalFilter {
// 结构化列谓词。适合文件层 pruning,例如 min/max、page index、dictionary、
// bloom filter 等只理解单列谓词的优化。
+ // TODO: conjunct 支持表达所有 filter 语义之后删除。
std::vector<std::shared_ptr<ColumnPredicate>> predicates;
};
+enum class FileFormat {
+ PARQUET,
+ ORC,
+ CSV,
+};
+
// 通用文件层 scan 请求。
// 该结构描述所有文件格式都可以共享的 file-local 读取输入。这里不出现 table/global
// schema。所有 schema change、filter localization、default/generated/partition
@@ -75,9 +82,23 @@ struct FileLocalFilter {
struct FileScanRequest {
virtual ~FileScanRequest() = default;
- std::vector<ColumnId> projected_file_columns;
+ std::vector<ColumnId> predicate_columns;
+ std::vector<ColumnId> non_predicate_columns;
std::vector<FileLocalFilter> local_filters;
+ // fallback path if filters cannot be localized to file-local predicates.
The expression can reference projected_file_columns and partition columns.
std::vector<std::pair<ColumnId, VExprContextSPtr>> reader_expression_map;
+ // partition key -> value
+ std::map<std::string, Field> partition_values;
+
+ // projected_columns' id is file-local column id, and they are all from
file schema.
+ // For example,
+ // file schema: [0: id (int), 1: name (string), 2: age (int)]
+ // predicate: age > 30
+ // table-level projection: [name, id]
+ // predicate_columns: [2]
+ // non_predicate_columns: [1, 0]
+ // projected_columns are columns in blocks returned to table reader: [1,
0] means only name and id are projected,
+ std::vector<ColumnId> projected_columns;
};
// 文件物理读取层通用接口。
@@ -113,7 +134,7 @@ public:
virtual Status init(const FileScanRequest& request) {
// 真实实现会根据 projected columns、local filters 和 reader expressions
// 初始化文件格式自己的物理读取计划。
- _request.projected_file_columns = request.projected_file_columns;
+ // _request.projected_file_columns = request.projected_file_columns;
_request.local_filters = request.local_filters;
_request.reader_expression_map = request.reader_expression_map;
return Status::OK();
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 99dcc507e5d..4d8fe0620c8 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -30,7 +30,10 @@
#include "common/status.h"
#include "core/block/block.h"
#include "core/data_type/data_type.h"
+#include "exprs/vexpr_context.h"
#include "exprs/vexpr_fwd.h"
+#include "format/reader/column_mapper.h"
+#include "format/reader/expr/literal.h"
#include "format/reader/file_reader.h"
namespace doris {
@@ -47,45 +50,22 @@ struct TableColumn {
std::string name;
DataTypePtr type;
std::vector<TableColumn> children;
+ VExprContextSPtr default_expr;
+ bool is_partition_key = false;
};
// table-level filter。
// TableColumnMapper 负责把它转换成 FileLocalFilter 或 reader_expression_map。
struct TableFilter {
- ColumnId table_column_id = -1;
-
// 表达式过滤,适合表达 cast、复杂表达式、复杂列提取等语义。
VExprContextSPtr conjunct;
// 结构化列谓词,适合下推到文件层做 row group stats、page index、dictionary、
// bloom filter 等优化。
+ // TODO: conjunct 支持表达所有 filter 语义之后删除。
std::vector<std::shared_ptr<ColumnPredicate>> predicates;
-};
-
-// 单个 table column 到 file column 的映射结果。
-// 这是 table 层和 file 层的核心边界对象。
-struct ColumnMapping {
- ColumnId table_column_id = -1;
- std::optional<ColumnId> file_column_id;
- DataTypePtr file_type;
- DataTypePtr table_type;
-
- // 最终输出表达式。用于把 file-local value 转成 table/global value,例如 cast、
- // default、partition、generated column 或复杂列 remap。
- VExprContextSPtr finalize_expr;
-
- // 读时过滤 fallback 表达式。只在 table filter 不能安全转换成 file-local predicate
- // 时使用,服务 reader_expression_map,不等价于 finalize_expr。
- VExprContextSPtr reader_filter_expr;
-
- std::vector<ColumnMapping> child_mappings;
- bool is_trivial = false;
- bool is_constant = false;
-};
-enum class TableColumnMappingMode {
- BY_FIELD_ID,
- BY_NAME,
+ bool can_be_localized() const { return true; }
};
enum class TableFilterConversion {
@@ -95,132 +75,6 @@ enum class TableFilterConversion {
FINALIZE_ONLY,
};
-struct TableColumnMapperOptions {
- TableColumnMappingMode mode = TableColumnMappingMode::BY_FIELD_ID;
- bool allow_missing_columns = true;
- bool enable_reader_expression_fallback = true;
-};
-
-// table-level scan 请求。
-// 它仍然使用 table/global schema 语义,不能直接传给 FileReader。
-struct TableScanRequest {
- std::vector<TableColumn> projected_table_columns;
- std::vector<TableFilter> table_filters;
-};
-
-// 通用 table schema 到 file schema 映射层。
-// Iceberg 会使用 BY_FIELD_ID;普通 by-name 场景可以复用该组件,但不应把它命名成
-// Iceberg-only 组件。
-class TableColumnMapper {
-public:
- explicit TableColumnMapper(TableColumnMapperOptions options = {})
- : _options(std::move(options)) {}
- virtual ~TableColumnMapper() = default;
-
- // 建立 table schema 到 file schema 的列映射。
- // 输出的 ColumnMapping 描述 table column 如何从 file column、常量列或表达式得到;
- // 后续 projection、filter localization 和 table block finalize 都应复用这份映射。
- virtual Status create_mapping(const std::vector<TableColumn>& table_schema,
- const std::vector<SchemaField>& file_schema,
- std::vector<ColumnMapping>* mappings) {
- // 真实实现会做 field id/name matching、类型转换、复杂列 child mapping、缺失列
- // default/partition/generated 表达式构造。
- mappings->clear();
- for (const auto& table_column : table_schema) {
- ColumnMapping mapping;
- mapping.table_column_id = table_column.id;
- mapping.table_type = table_column.type;
- if (const auto* file_field = find_file_field(table_column,
file_schema)) {
- mapping.file_column_id = file_field->id;
- mapping.file_type = file_field->type;
- mapping.is_trivial = is_same_type(mapping.table_type,
mapping.file_type);
- } else {
- mapping.is_constant = true;
- }
- mappings->push_back(std::move(mapping));
- }
- _mappings = *mappings;
- return Status::OK();
- }
-
- // 把 table-level scan 请求转换成 file-local scan 请求。
- // table_request 使用 table/global schema;file_request 只包含 FileReader 能理解的
- // projected_file_columns、local_filters 和 reader_expression_map。
- virtual Status create_scan_request(const TableScanRequest& table_request,
- const std::vector<ColumnMapping>&
mappings,
- FileScanRequest* file_request) {
- // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
- file_request->projected_file_columns.clear();
- file_request->local_filters.clear();
- file_request->reader_expression_map.clear();
- _mappings = mappings;
- for (const auto& table_column : table_request.projected_table_columns)
{
- const auto* mapping = find_mapping(table_column.id);
- if (mapping != nullptr && mapping->file_column_id.has_value()) {
-
file_request->projected_file_columns.push_back(*mapping->file_column_id);
- }
- }
- RETURN_IF_ERROR(localize_filters(table_request.table_filters,
file_request));
- return Status::OK();
- }
-
- // 将 table-level filter 定位到文件 schema。
- // trivial mapping 可以直接复制结构化谓词;类型变化时可以尝试安全 cast;无法安全
- // 下推的表达式应通过 reader_expression_map 或 table-level finalize/filter fallback
处理。
- virtual Status localize_filters(const std::vector<TableFilter>&
table_filters,
- FileScanRequest* file_request) const {
- // 真实实现会处理 trivial mapping、safe cast、reader expression fallback 和
- // finalize-only filter。stub 只复制能够直接定位到 file column 的谓词。
- for (const auto& filter : table_filters) {
- const auto* mapping = find_mapping(filter.table_column_id);
- if (mapping == nullptr || !mapping->file_column_id.has_value()) {
- continue;
- }
- FileLocalFilter local_filter;
- local_filter.file_column_id = *mapping->file_column_id;
- local_filter.conjunct = filter.conjunct;
- local_filter.predicates = filter.predicates;
- file_request->local_filters.push_back(std::move(local_filter));
- }
- return Status::OK();
- }
-
- const std::vector<ColumnMapping>& mappings() const { return _mappings; }
-
-private:
- const SchemaField* find_file_field(const TableColumn& table_column,
- const std::vector<SchemaField>&
file_schema) const {
- for (const auto& field : file_schema) {
- if (_options.mode == TableColumnMappingMode::BY_FIELD_ID &&
- field.id == table_column.id) {
- return &field;
- }
- if (_options.mode == TableColumnMappingMode::BY_NAME &&
- field.name == table_column.name) {
- return &field;
- }
- }
- return nullptr;
- }
-
- const ColumnMapping* find_mapping(ColumnId table_column_id) const {
- for (const auto& mapping : _mappings) {
- if (mapping.table_column_id == table_column_id) {
- return &mapping;
- }
- }
- return nullptr;
- }
-
- bool is_same_type(const DataTypePtr& table_type, const DataTypePtr&
file_type) const {
- return table_type == file_type;
- }
-
-private:
- TableColumnMapperOptions _options;
- std::vector<ColumnMapping> _mappings;
-};
-
struct BaseDataFile {
virtual ~BaseDataFile() = default;
@@ -236,13 +90,21 @@ struct ScanTask {
std::unique_ptr<BaseDataFile> data_file;
};
+struct ReadProfile {};
+
struct TableReadOptions {
- size_t batch_size = 4096;
- // TODO: deleted? SCHEMA should be derived from table metadata and inited
by TableReader it self? it shouldn't be part of read options.
- std::vector<TableColumn> schema;
- TableScanRequest scan_request;
+ const std::vector<TableColumn> projected_columns;
+ // All conjuncts from scan operator
+ const VExprContext conjuncts;
+ const FileFormat format;
// Each task denotes a descriptor of a single file to read, along with
file-level metadata such as stats and delete files.
std::vector<std::unique_ptr<ScanTask>> scan_tasks;
+
+ std::unique_ptr<ReadProfile> profile;
+};
+
+struct SplitReadOptions {
+ std::map<std::string, Field> partition_values;
};
// table-level reader 基类。
@@ -255,10 +117,20 @@ public:
// 初始化 table reader 的通用运行参数。
// 子类可以在自己的 init(options) 中调用该方法;这里不接收具体表格式 schema/task。
virtual Status init(TableReadOptions options) {
- _schema = std::move(_options.schema);
- _table_scan_request = std::move(_options.scan_request);
_scan_tasks = std::move(_options.scan_tasks);
_next_task_idx = 0;
+ _profile = std::move(options.profile);
+ TableColumnMapperOptions mapper_options;
+ mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+ _data_reader.column_mapper = TableColumnMapper(mapper_options);
+ // TODO:
+ // _table_filters =
build_table_filters_from_conjuncts(options.conjuncts);
+ return Status::OK();
+ }
+
+ // 读取当前 split/partition 之前初始化。
+ virtual Status prepare_split(SplitReadOptions options) {
+ _partition_values = std::move(options.partition_values);
return Status::OK();
}
@@ -278,20 +150,30 @@ public:
// 基类负责 current reader 的打开、EOF 后切换和关闭;子类只实现 protected hook。
// table_block 的列必须已经是 table/global schema 语义。
Status get_block(Block* block, bool* eos) {
- if (eos != nullptr) {
- *eos = false;
- }
while (block->empty() && !*eos) {
- if (!_data_reader) {
+ if (!_data_reader.reader) {
RETURN_IF_ERROR(create_next_reader(eos));
- if (!_data_reader) {
+ if (!_data_reader.reader) {
DCHECK(*eos);
return Status::OK();
}
}
bool current_eof = false;
- RETURN_IF_ERROR(_data_reader->get_block(block, ¤t_eof));
+ Block current_block;
+ for (const auto& field : _data_reader.block_schema) {
+ // TODO: reuse column's memory
+ current_block.insert({field.type->create_column(), field.type,
field.name});
+ }
+ RETURN_IF_ERROR(_data_reader.reader->get_block(¤t_block,
¤t_eof));
+
+ size_t idx = 0;
+ for (const auto& mapping : _data_reader.column_mapper.mappings()) {
+ int res_id;
+ RETURN_IF_ERROR(mapping.projection->execute(¤t_block,
&res_id));
+ block->replace_by_position(idx,
current_block.get_columns()[res_id]);
+ idx++;
+ }
RETURN_IF_ERROR(finalize_chunk(block));
RETURN_IF_ERROR(materialize_virtual_columns(block));
if (current_eof) {
@@ -304,7 +186,7 @@ public:
// 关闭 table reader 及当前正在读取的底层 reader。
// 子类如果持有额外表格式资源,应 override 后先调用 TableReader::close()。
virtual Status close() {
- if (_data_reader) {
+ if (_data_reader.reader) {
RETURN_IF_ERROR(close_current_reader());
}
return Status::OK();
@@ -315,10 +197,10 @@ protected:
// 该方法先关闭当前 reader,再打开下一个具体 reader;子类不应重复实现这个循环。
Status create_next_reader(bool* eos) {
// 多文件切换的公共流程留在基类:关闭当前 reader,然后打开下一个 reader。
- DCHECK(_data_reader == nullptr);
+ DCHECK(_data_reader.reader == nullptr);
// TODO: 创建_data_reader
// _data_reader = std::make_unique<FileReader>(...);
- if (!_data_reader) {
+ if (!_data_reader.reader) {
if (eos != nullptr) {
*eos = true;
}
@@ -332,24 +214,25 @@ protected:
// 子类在这里基于当前 split/task 初始化底层 FileReader。
virtual Status open_reader() {
std::vector<SchemaField> file_schema;
- RETURN_IF_ERROR(_data_reader->get_schema(&file_schema));
- TableColumnMapperOptions mapper_options;
- mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
- _column_mapper = TableColumnMapper(mapper_options);
- RETURN_IF_ERROR(_column_mapper.create_mapping(_schema, file_schema,
&_mappings));
+ RETURN_IF_ERROR(_data_reader.reader->get_schema(&file_schema));
+
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_options.projected_columns,
+
_data_reader.block_schema,
+
_partition_values, file_schema));
FileScanRequest file_request;
- RETURN_IF_ERROR(
- _column_mapper.create_scan_request(_table_scan_request,
_mappings, &file_request));
- RETURN_IF_ERROR(_data_reader->init(file_request));
+ RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
+ _table_filters, _options.projected_columns, &file_request));
+ RETURN_IF_ERROR(_data_reader.reader->init(file_request));
return Status::OK();
}
// 关闭当前具体 reader。
// 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
virtual Status close_current_reader() {
- RETURN_IF_ERROR(_data_reader->close());
- _data_reader.reset();
+ RETURN_IF_ERROR(_data_reader.reader->close());
+ _data_reader.reader.reset();
+ _data_reader.column_mapper.clear();
+ _data_reader.block_schema.clear();
return Status::OK();
}
@@ -365,14 +248,20 @@ protected:
return Status::OK();
}
+ struct DataReader {
+ std::unique_ptr<FileReader> reader;
+ TableColumnMapper column_mapper;
+ // Schema of blocks from file reader.
+ std::vector<SchemaField> block_schema;
+ };
+ DataReader _data_reader;
TableReadOptions _options;
- std::unique_ptr<FileReader> _data_reader;
std::vector<std::unique_ptr<ScanTask>> _scan_tasks;
- TableScanRequest _table_scan_request;
- std::vector<TableColumn> _schema;
- std::vector<ColumnMapping> _mappings;
- TableColumnMapper _column_mapper;
+ // partition key -> value
+ std::map<std::string, Field> _partition_values;
size_t _next_task_idx = 0;
+ std::map<int32_t, TableFilter> _table_filters;
+ std::unique_ptr<ReadProfile> _profile;
};
} // namespace doris::reader
diff --git a/be/src/format/table/iceberg_reader_mixin.h
b/be/src/format/table/iceberg_reader_mixin.h
index 42c80c9b7d4..c9c84639b8f 100644
--- a/be/src/format/table/iceberg_reader_mixin.h
+++ b/be/src/format/table/iceberg_reader_mixin.h
@@ -341,9 +341,6 @@ protected:
// id -> block column name
std::unordered_map<int, std::string> _id_to_block_column_name;
- // File column names used during init
- std::vector<std::string> _file_col_names;
-
std::function<std::shared_ptr<segment_v2::RowIdColumnIteratorV2>()>
_create_topn_row_id_column_iterator;
diff --git a/be/src/format/table/iceberg_reader_v2.h
b/be/src/format/table/iceberg_reader_v2.h
index 3ddadc9f9de..fc957eda124 100644
--- a/be/src/format/table/iceberg_reader_v2.h
+++ b/be/src/format/table/iceberg_reader_v2.h
@@ -60,28 +60,8 @@ struct IcebergScanTask final : public reader::ScanTask {
// FileReader 完成 data file 物理读取,不继承具体文件格式 reader。
class IcebergTableReader : public reader::TableReader {
public:
- IcebergTableReader() = default;
-
~IcebergTableReader() override = default;
- // 初始化一次 Iceberg table scan。
- // options 必须一次性提供 schema、projection/filter 和 scan tasks,避免暴露
- // bind/set_tasks 等半初始化阶段。
- Status init(reader::TableReadOptions options) override {
- // 一次性保存 Iceberg table scan 所需输入。TableReader 负责 reader 切换流程;
- // IcebergTableReader 只提供后续要打开的 task 以及 table/file schema 映射语义。
- return reader::TableReader::init(std::move(options));
- }
-
- // 关闭当前 Iceberg scan。
- // 先让 TableReader 关闭当前 task reader,再释放 IcebergTableReader 持有的底层
- // FileReader。
- Status close() override {
- RETURN_IF_ERROR(reader::TableReader::close());
- _data_reader.reset();
- return Status::OK();
- }
-
protected:
// 将 file-local block 转换为 table/global schema block。
// 这里执行 ColumnMapping 中的 finalize_expr、缺失列填充、partition/generated 列
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]