This is an automated email from the ASF dual-hosted git repository.
Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/refact_reader_branch by this
push:
new f19f78aa940 [feature](be) Build table filters from conjuncts (#63733)
f19f78aa940 is described below
commit f19f78aa940271a5125e0a0beebe0be530df3437
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 16:16:57 2026 +0800
[feature](be) Build table filters from conjuncts (#63733)
---
be/src/exprs/vslot_ref.h | 4 +-
be/src/format/reader/column_mapper.cpp | 74 +++++++++-
be/src/format/reader/table_reader.cpp | 61 +++++++-
be/src/format/reader/table_reader.h | 8 ++
be/test/format/reader/table_reader_test.cpp | 214 +++++++++++++++++++++++++---
5 files changed, 331 insertions(+), 30 deletions(-)
diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 6e7197f4cf6..8cb26f9bcfd 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -75,7 +75,9 @@ public:
protected:
VSlotRef(int slot_id, int column_id, int column_uniq_id)
- : _slot_id(slot_id), _column_id(column_id),
_column_uniq_id(column_uniq_id) {}
+ : _slot_id(slot_id), _column_id(column_id),
_column_uniq_id(column_uniq_id) {
+ set_node_type(TExprNodeType::SLOT_REF);
+ }
private:
int _slot_id;
diff --git a/be/src/format/reader/column_mapper.cpp
b/be/src/format/reader/column_mapper.cpp
index b2453dbbfaf..5790517f7bb 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -21,6 +21,7 @@
#include <vector>
#include "common/status.h"
+#include "core/assert_cast.h"
#include "format/reader/expr/cast.h"
#include "format/reader/expr/slot_ref.h"
#include "format/reader/file_reader.h"
@@ -28,6 +29,35 @@
namespace doris::reader {
+static VExprSPtr rewrite_table_expr_to_file_expr(
+ const VExprSPtr& expr, const std::map<int32_t, size_t>&
table_column_to_file_position) {
+ if (expr == nullptr) {
+ return nullptr;
+ }
+ if (expr->is_slot_ref()) {
+ const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
+ const auto position_it =
table_column_to_file_position.find(slot_ref->slot_id());
+ if (position_it != table_column_to_file_position.end()) {
+ return TableSlotRef::create_shared(slot_ref->slot_id(),
+
cast_set<int>(position_it->second), -1,
+ slot_ref->data_type(),
slot_ref->expr_name());
+ }
+ return expr;
+ }
+
+ // VExpr currently does not provide a generic deep-clone API for arbitrary
expression types.
+ // Keep all slot-localization mutation inside ColumnMapper and rebuild it
for every split
+ // before the localized expression is prepared/opened by TableReader.
+ VExprSPtrs rewritten_children;
+ rewritten_children.reserve(expr->children().size());
+ for (const auto& child : expr->children()) {
+ rewritten_children.push_back(
+ rewrite_table_expr_to_file_expr(child,
table_column_to_file_position));
+ }
+ expr->set_children(std::move(rewritten_children));
+ return expr;
+}
+
static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER =
"_last_updated_sequence_number";
@@ -56,6 +86,21 @@ static void rebuild_projection(ColumnMapping* mapping,
size_t block_position) {
mapping->projection = VExprContext::create_shared(expr);
}
+static std::map<int32_t, size_t> build_file_position_map(
+ const std::vector<ColumnMapping>& mappings, const FileScanRequest&
file_request) {
+ std::map<int32_t, size_t> table_column_to_file_position;
+ for (const auto& mapping : mappings) {
+ if (!mapping.file_column_id.has_value()) {
+ continue;
+ }
+ const auto position_it =
file_request.column_positions.find(*mapping.file_column_id);
+ if (position_it != file_request.column_positions.end()) {
+ table_column_to_file_position.emplace(mapping.table_column_id,
position_it->second);
+ }
+ }
+ return table_column_to_file_position;
+}
+
Status TableColumnMapper::create_mapping(const std::vector<TableColumn>&
projected_columns,
const std::map<std::string, Field>&
partition_values,
const std::vector<SchemaField>&
file_schema) {
@@ -102,7 +147,8 @@ Status TableColumnMapper::create_mapping(const
std::vector<TableColumn>& project
Status TableColumnMapper::create_scan_request(const std::map<int32_t,
TableFilter>& table_filters,
const std::vector<TableColumn>&
projected_columns,
FileScanRequest* file_request) {
- // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
+ // FileReader evaluates expressions against a file-local block. This
mapper owns the
+ // table-column to file-column conversion, so it also owns the file-local
block positions.
file_request->predicate_columns.clear();
file_request->non_predicate_columns.clear();
file_request->column_positions.clear();
@@ -141,15 +187,29 @@ Status TableColumnMapper::localize_filters(const
std::map<int32_t, TableFilter>&
if (!it.second.can_be_localized()) {
// TODO: Rewrite table filter to reader_expression_map
//
file_request->reader_expression_map.emplace_back(mapping->table_column_id,
it.second.conjunct);
- } else {
- FileLocalFilter local_filter;
- local_filter.file_column_id = *mapping->file_column_id;
- local_filter.conjunct = it.second.conjunct;
- local_filter.predicates = it.second.predicates;
- file_request->local_filters.push_back(std::move(local_filter));
+ continue;
}
add_scan_column(file_request, *mapping->file_column_id,
&file_request->predicate_columns);
}
+
+ // Build the complete table-slot to file-block position map after all
predicate columns have
+ // been assigned. This keeps expression localization independent from
filter iteration order.
+ const auto table_column_to_file_position =
build_file_position_map(_mappings, *file_request);
+ for (const auto& it : table_filters) {
+ const auto* mapping = _find_mapping(it.first);
+ if (mapping == nullptr || !mapping->file_column_id.has_value() ||
+ !it.second.can_be_localized()) {
+ continue;
+ }
+ FileLocalFilter local_filter;
+ local_filter.file_column_id = *mapping->file_column_id;
+ if (it.second.conjunct != nullptr) {
+ local_filter.conjunct =
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
+ it.second.conjunct->root(),
table_column_to_file_position));
+ }
+ local_filter.predicates = it.second.predicates;
+ file_request->local_filters.push_back(std::move(local_filter));
+ }
return Status::OK();
}
diff --git a/be/src/format/reader/table_reader.cpp
b/be/src/format/reader/table_reader.cpp
index 13f093228e6..f6cfa21600e 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -20,15 +20,54 @@
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Types_types.h>
+#include <set>
#include <vector>
#include "common/status.h"
+#include "core/assert_cast.h"
+#include "exprs/vslot_ref.h"
#include "format/new_parquet/parquet_reader.h"
#include "format/reader/column_mapper.h"
#include "format/table/deletion_vector_reader.h"
#include "io/io_common.h"
namespace doris::reader {
+namespace {
+
+void collect_table_slot_ids(const VExprSPtr& expr, std::set<int>* slot_ids) {
+ if (expr == nullptr) {
+ return;
+ }
+ if (expr->is_slot_ref()) {
+ const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
+ slot_ids->insert(slot_ref->slot_id());
+ }
+ for (const auto& child : expr->children()) {
+ collect_table_slot_ids(child, slot_ids);
+ }
+}
+
+void build_table_filters_from_conjunct(const VExprSPtr& conjunct,
+ std::map<int32_t, TableFilter>*
table_filters) {
+ if (conjunct == nullptr) {
+ return;
+ }
+ std::set<int> slot_ids;
+ collect_table_slot_ids(conjunct, &slot_ids);
+ if (slot_ids.size() == 1) {
+ (*table_filters)[*slot_ids.begin()].conjunct =
VExprContext::create_shared(conjunct);
+ return;
+ }
+ if (conjunct->node_type() == TExprNodeType::COMPOUND_PRED &&
+ conjunct->op() == TExprOpcode::COMPOUND_AND) {
+ for (const auto& child : conjunct->children()) {
+ build_table_filters_from_conjunct(child, table_filters);
+ }
+ return;
+ }
+}
+
+} // namespace
std::shared_ptr<io::FileSystemProperties> create_system_properties(
const TFileScanRangeParams* scan_params) {
@@ -58,9 +97,27 @@ Status TableReader::init(TableReadOptions options) {
_profile = std::move(options.profile);
TableColumnMapperOptions mapper_options;
mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+ mapper_options.allow_missing_columns = options.allow_missing_columns;
_data_reader.column_mapper = TableColumnMapper(mapper_options);
- // TODO:
- // _table_filters = build_table_filters_from_conjuncts(options.conjuncts);
+ _conjuncts = std::move(options.conjuncts);
+ return Status::OK();
+}
+
+Status TableReader::_build_table_filters_from_conjuncts() {
+ _table_filters.clear();
+ build_table_filters_from_conjunct(_conjuncts.root(), &_table_filters);
+ return Status::OK();
+}
+
+Status TableReader::_open_local_filter_exprs(const FileScanRequest&
file_request) {
+ RowDescriptor row_desc;
+ for (const auto& local_filter : file_request.local_filters) {
+ if (local_filter.conjunct == nullptr) {
+ continue;
+ }
+ RETURN_IF_ERROR(local_filter.conjunct->prepare(_runtime_state,
row_desc));
+ RETURN_IF_ERROR(local_filter.conjunct->open(_runtime_state));
+ }
return Status::OK();
}
diff --git a/be/src/format/reader/table_reader.h
b/be/src/format/reader/table_reader.h
index 53791747faf..4f28c4e1aaa 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -107,6 +107,7 @@ struct TableReadOptions {
std::shared_ptr<io::IOContext> io_ctx;
RuntimeState* runtime_state;
RuntimeProfile* scanner_profile;
+ const bool allow_missing_columns = true;
std::unique_ptr<ReadProfile> profile;
};
@@ -219,10 +220,12 @@ protected:
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
_partition_values, file_schema));
DORIS_CHECK(_data_reader.column_mapper.mappings().size() ==
_projected_columns.size());
+ RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
auto file_request = std::make_unique<FileScanRequest>();
RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
_table_filters, _projected_columns, file_request.get()));
+ RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
_data_reader.scan_schema.resize(file_request->column_positions.size());
@@ -242,12 +245,16 @@ protected:
return Status::OK();
}
+ Status _build_table_filters_from_conjuncts();
+ Status _open_local_filter_exprs(const FileScanRequest& file_request);
+
// 关闭当前具体 reader。
// 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
virtual Status close_current_reader() {
RETURN_IF_ERROR(_data_reader.reader->close());
_data_reader.reader.reset();
_data_reader.column_mapper.clear();
+ _table_filters.clear();
_data_reader.block_schema.clear();
_data_reader.scan_schema.clear();
_data_reader.block_template.clear();
@@ -314,6 +321,7 @@ protected:
// partition key -> value
std::map<std::string, Field> _partition_values;
std::map<int32_t, TableFilter> _table_filters;
+ VExprContext _conjuncts {nullptr};
std::unique_ptr<ReadProfile> _profile;
// Parsed from DELETION_VECTOR in Iceberg and Paimon
DeleteRows* _delete_rows;
diff --git a/be/test/format/reader/table_reader_test.cpp
b/be/test/format/reader/table_reader_test.cpp
index 84c5700fc4c..dc2e26f35ea 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -33,12 +33,48 @@
#include "core/column/column_vector.h"
#include "core/data_type/data_type_number.h"
#include "core/data_type/data_type_string.h"
+#include "exprs/vexpr.h"
+#include "format/reader/expr/slot_ref.h"
#include "gen_cpp/PlanNodes_types.h"
#include "runtime/runtime_state.h"
namespace doris::reader {
namespace {
+class TableInt32GreaterThanExpr final : public VExpr {
+public:
+ TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value)
+ : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
+ add_child(TableSlotRef::create_shared(slot_id, column_id, -1,
+
std::make_shared<DataTypeInt32>(), "id"));
+ set_node_type(TExprNodeType::BINARY_PRED);
+ _opcode = TExprOpcode::GT;
+ }
+
+ Status execute_column_impl(VExprContext* context, const Block* block,
const Selector* selector,
+ size_t count, ColumnPtr& result_column) const
override {
+ const auto* slot_ref = assert_cast<const
VSlotRef*>(get_child(0).get());
+ const auto& input =
+ assert_cast<const ColumnInt32&>(
+ *block->get_by_position(slot_ref->column_id()).column);
+ auto result = ColumnUInt8::create();
+ auto& result_data = result->get_data();
+ result_data.resize(count);
+ for (size_t row = 0; row < count; ++row) {
+ const size_t input_row = selector == nullptr ? row :
(*selector)[row];
+ result_data[row] = input.get_element(input_row) > _value;
+ }
+ result_column = std::move(result);
+ return Status::OK();
+ }
+
+ const std::string& expr_name() const override { return _expr_name; }
+
+private:
+ const int32_t _value;
+ const std::string _expr_name = "TableInt32GreaterThanExpr";
+};
+
std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
std::shared_ptr<arrow::Array> array;
EXPECT_TRUE(builder->Finish(&array).ok());
@@ -97,6 +133,14 @@ SplitReadOptions build_split_options(const std::string&
file_path) {
return options;
}
+TableColumn make_table_column(ColumnId id, const std::string& name, const
DataTypePtr& type) {
+ TableColumn column;
+ column.id = id;
+ column.name = name;
+ column.type = type;
+ return column;
+}
+
TEST(TableReaderTest, ReopenSplitAfterClose) {
const auto test_dir = std::filesystem::temp_directory_path() /
"doris_table_reader_test";
std::filesystem::remove_all(test_dir);
@@ -112,21 +156,23 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
write_parquet_file(file_paths[2], 3, "three");
std::vector<TableColumn> projected_columns;
- projected_columns.push_back({.id = 0, .name = "id", .type =
std::make_shared<DataTypeInt32>()});
- projected_columns.push_back(
- {.id = 1, .name = "value", .type =
std::make_shared<DataTypeString>()});
+ projected_columns.push_back(make_table_column(1, "value",
std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
ASSERT_TRUE(reader
.init({
.projected_columns = projected_columns,
- .conjuncts = VExprContext(nullptr),
+ .conjuncts = VExprContext(
+
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
.format = FileFormat::PARQUET,
.scan_params = nullptr,
.io_ctx = nullptr,
.runtime_state = &state,
.scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
})
.ok());
@@ -134,6 +180,9 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
// init() once, then repeat prepare_split() -> get_block() -> close().
// This verifies TableReader::close() fully releases the previous
low-level reader and task
// state, so a later prepare_split() can open and read a new split on the
same TableReader.
+ // The table-level conjunct is also rebuilt for each split. The projection
order puts value
+ // before id, so the pushed conjunct has to be rewritten to the
ParquetReader file-local block
+ // position every time a new split is opened.
std::vector<int32_t> ids;
std::vector<std::string> values;
for (const auto& file_path : file_paths) {
@@ -145,9 +194,9 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
ASSERT_TRUE(reader.get_block(&block, &eos).ok());
ASSERT_FALSE(eos);
- const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
const auto& value_column =
- assert_cast<const
ColumnString&>(*block.get_by_position(1).column);
+ assert_cast<const
ColumnString&>(*block.get_by_position(0).column);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
ASSERT_EQ(id_column.size(), 1);
ASSERT_EQ(value_column.size(), 1);
ids.push_back(id_column.get_element(0));
@@ -162,7 +211,81 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
std::filesystem::remove_all(test_dir);
}
-TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) {
+TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) {
+ const auto test_dir =
+ std::filesystem::temp_directory_path() /
"doris_table_reader_conjunct_filter_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 3, "three");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(make_table_column(1, "value",
std::make_shared<DataTypeString>()));
+ projected_columns.push_back(make_table_column(0, "id",
std::make_shared<DataTypeInt32>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts =
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
+ 0, 0, 2)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ // open_reader() should convert the table-level conjunct on projected
column id 0 into
+ // _table_filters before ColumnMapper creates the FileScanRequest.
ColumnMapper then rewrites
+ // the conjunct's slot ref from table column id 0 to the file-local block
position used by
+ // ParquetReader. The projection order intentionally puts value before id,
so the id filter
+ // column is not at position 0 in the file block.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(1).column);
+ ASSERT_EQ(id_column.size(), 1);
+ EXPECT_EQ(id_column.get_element(0), 3);
+
+ ASSERT_TRUE(reader.close().ok());
+
+ TableReader filtered_reader;
+ ASSERT_TRUE(filtered_reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts =
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
+ 0, 0, 4)),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
+ })
+ .ok());
+
ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok());
+
+ block = build_table_block(projected_columns);
+ eos = false;
+ ASSERT_TRUE(filtered_reader.get_block(&block, &eos).ok());
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(block.get_by_position(1).column->size(), 0);
+
+ ASSERT_TRUE(filtered_reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_schema_mismatch_test";
std::filesystem::remove_all(test_dir);
@@ -173,7 +296,7 @@ TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatch) {
std::vector<TableColumn> projected_columns;
projected_columns.push_back(
- {.id = 99, .name = "missing_value", .type =
std::make_shared<DataTypeString>()});
+ make_table_column(99, "missing_value",
std::make_shared<DataTypeString>()));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -186,14 +309,59 @@ TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatch) {
.io_ctx = nullptr,
.runtime_state = &state,
.scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
})
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
// The table projection asks for field id 99, but the ParquetReader
exposes only file-local
- // fields 0 and 1. get_block() opens the split lazily, so this is where
TableReader must reject
- // the mismatch between TableReadOptions::projected_columns and the
Parquet file schema.
+ // fields 0 and 1. Missing columns are allowed by the current mapper
options, so TableReader
+ // should still use the Parquet row count and fill a default column in
table schema.
+ Block block = build_table_block(projected_columns);
+ bool eos = false;
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+ EXPECT_EQ(block.get_by_position(0).column->size(), 1);
+
+ ASSERT_TRUE(reader.close().ok());
+ std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColumnsDisallowed) {
+ const auto test_dir = std::filesystem::temp_directory_path() /
+ "doris_table_reader_schema_mismatch_reject_test";
+ std::filesystem::remove_all(test_dir);
+ std::filesystem::create_directories(test_dir);
+
+ const auto file_path = (test_dir / "split.parquet").string();
+ write_parquet_file(file_path, 1, "one");
+
+ std::vector<TableColumn> projected_columns;
+ projected_columns.push_back(
+ make_table_column(99, "missing_value",
std::make_shared<DataTypeString>()));
+
+ RuntimeState state {TQueryOptions(), TQueryGlobals()};
+ TableReader reader;
+ ASSERT_TRUE(reader
+ .init({
+ .projected_columns = projected_columns,
+ .conjuncts = VExprContext(nullptr),
+ .format = FileFormat::PARQUET,
+ .scan_params = nullptr,
+ .io_ctx = nullptr,
+ .runtime_state = &state,
+ .scanner_profile = nullptr,
+ .allow_missing_columns = false,
+ .profile = nullptr,
+ })
+ .ok());
+
+ ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+ // With allow_missing_columns disabled, the same missing projected column
should fail while
+ // opening the split instead of being materialized as a default column.
Block block = build_table_block(projected_columns);
bool eos = false;
const auto status = reader.get_block(&block, &eos);
@@ -204,7 +372,7 @@ TEST(TableReaderTest,
ProjectedColumnsRejectParquetSchemaMismatch) {
std::filesystem::remove_all(test_dir);
}
-TEST(TableReaderTest,
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMismatch) {
+TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) {
const auto test_dir =
std::filesystem::temp_directory_path() /
"doris_table_reader_same_name_diff_id_test";
std::filesystem::remove_all(test_dir);
@@ -214,8 +382,7 @@ TEST(TableReaderTest,
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMism
write_parquet_file(file_path, 1, "one");
std::vector<TableColumn> projected_columns;
- projected_columns.push_back(
- {.id = 99, .name = "id", .type =
std::make_shared<DataTypeInt32>()});
+ projected_columns.push_back(make_table_column(99, "id",
std::make_shared<DataTypeInt32>()));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -228,19 +395,24 @@ TEST(TableReaderTest,
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMism
.io_ctx = nullptr,
.runtime_state = &state,
.scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
})
.ok());
ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
// The table column has the same name as the Parquet field, but a
different field id.
- // TableReader configures ColumnMapper in BY_FIELD_ID mode, so the name
match must not hide
- // the id mismatch.
+ // ColumnMapper should still resolve it by name and build a SlotRef
projection from the file
+ // column into the requested table column.
Block block = build_table_block(projected_columns);
bool eos = false;
- const auto status = reader.get_block(&block, &eos);
- ASSERT_FALSE(status.ok());
- EXPECT_NE(status.to_string().find("does not have a matching file column"),
std::string::npos);
+ ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+ ASSERT_FALSE(eos);
+
+ const auto& id_column = assert_cast<const
ColumnInt32&>(*block.get_by_position(0).column);
+ ASSERT_EQ(id_column.size(), 1);
+ EXPECT_EQ(id_column.get_element(0), 1);
ASSERT_TRUE(reader.close().ok());
std::filesystem::remove_all(test_dir);
@@ -257,9 +429,9 @@ TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
std::vector<TableColumn> projected_columns;
projected_columns.push_back(
- {.id = 0, .name = "table_id", .type =
std::make_shared<DataTypeInt64>()});
+ make_table_column(0, "table_id",
std::make_shared<DataTypeInt64>()));
projected_columns.push_back(
- {.id = 1, .name = "table_value", .type =
std::make_shared<DataTypeString>()});
+ make_table_column(1, "table_value",
std::make_shared<DataTypeString>()));
RuntimeState state {TQueryOptions(), TQueryGlobals()};
TableReader reader;
@@ -272,6 +444,8 @@ TEST(TableReaderTest,
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
.io_ctx = nullptr,
.runtime_state = &state,
.scanner_profile = nullptr,
+ .allow_missing_columns = true,
+ .profile = nullptr,
})
.ok());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]