This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new f19f78aa940 [feature](be) Build table filters from conjuncts (#63733)
f19f78aa940 is described below

commit f19f78aa940271a5125e0a0beebe0be530df3437
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 16:16:57 2026 +0800

    [feature](be) Build table filters from conjuncts (#63733)
---
 be/src/exprs/vslot_ref.h                    |   4 +-
 be/src/format/reader/column_mapper.cpp      |  74 +++++++++-
 be/src/format/reader/table_reader.cpp       |  61 +++++++-
 be/src/format/reader/table_reader.h         |   8 ++
 be/test/format/reader/table_reader_test.cpp | 214 +++++++++++++++++++++++++---
 5 files changed, 331 insertions(+), 30 deletions(-)

diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 6e7197f4cf6..8cb26f9bcfd 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -75,7 +75,9 @@ public:
 
 protected:
     VSlotRef(int slot_id, int column_id, int column_uniq_id)
-            : _slot_id(slot_id), _column_id(column_id), 
_column_uniq_id(column_uniq_id) {}
+            : _slot_id(slot_id), _column_id(column_id), 
_column_uniq_id(column_uniq_id) {
+        set_node_type(TExprNodeType::SLOT_REF);
+    }
 
 private:
     int _slot_id;
diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
index b2453dbbfaf..5790517f7bb 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "common/status.h"
+#include "core/assert_cast.h"
 #include "format/reader/expr/cast.h"
 #include "format/reader/expr/slot_ref.h"
 #include "format/reader/file_reader.h"
@@ -28,6 +29,35 @@
 
 namespace doris::reader {
 
+static VExprSPtr rewrite_table_expr_to_file_expr(
+        const VExprSPtr& expr, const std::map<int32_t, size_t>& 
table_column_to_file_position) {
+    if (expr == nullptr) {
+        return nullptr;
+    }
+    if (expr->is_slot_ref()) {
+        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
+        const auto position_it = 
table_column_to_file_position.find(slot_ref->slot_id());
+        if (position_it != table_column_to_file_position.end()) {
+            return TableSlotRef::create_shared(slot_ref->slot_id(),
+                                               
cast_set<int>(position_it->second), -1,
+                                               slot_ref->data_type(), 
slot_ref->expr_name());
+        }
+        return expr;
+    }
+
+    // VExpr currently does not provide a generic deep-clone API for arbitrary 
expression types.
+    // Keep all slot-localization mutation inside ColumnMapper and rebuild it 
for every split
+    // before the localized expression is prepared/opened by TableReader.
+    VExprSPtrs rewritten_children;
+    rewritten_children.reserve(expr->children().size());
+    for (const auto& child : expr->children()) {
+        rewritten_children.push_back(
+                rewrite_table_expr_to_file_expr(child, 
table_column_to_file_position));
+    }
+    expr->set_children(std::move(rewritten_children));
+    return expr;
+}
+
 static constexpr const char* ROW_LINEAGE_ROW_ID = "_row_id";
 static constexpr const char* ROW_LINEAGE_LAST_UPDATED_SEQ_NUMBER = 
"_last_updated_sequence_number";
 
@@ -56,6 +86,21 @@ static void rebuild_projection(ColumnMapping* mapping, 
size_t block_position) {
     mapping->projection = VExprContext::create_shared(expr);
 }
 
+static std::map<int32_t, size_t> build_file_position_map(
+        const std::vector<ColumnMapping>& mappings, const FileScanRequest& 
file_request) {
+    std::map<int32_t, size_t> table_column_to_file_position;
+    for (const auto& mapping : mappings) {
+        if (!mapping.file_column_id.has_value()) {
+            continue;
+        }
+        const auto position_it = 
file_request.column_positions.find(*mapping.file_column_id);
+        if (position_it != file_request.column_positions.end()) {
+            table_column_to_file_position.emplace(mapping.table_column_id, 
position_it->second);
+        }
+    }
+    return table_column_to_file_position;
+}
+
 Status TableColumnMapper::create_mapping(const std::vector<TableColumn>& 
projected_columns,
                                          const std::map<std::string, Field>& 
partition_values,
                                          const std::vector<SchemaField>& 
file_schema) {
@@ -102,7 +147,8 @@ Status TableColumnMapper::create_mapping(const 
std::vector<TableColumn>& project
 Status TableColumnMapper::create_scan_request(const std::map<int32_t, 
TableFilter>& table_filters,
                                               const std::vector<TableColumn>& 
projected_columns,
                                               FileScanRequest* file_request) {
-    // 真实实现会把 table projection/filter 转换成 file-local projection/filter。
+    // FileReader evaluates expressions against a file-local block. This 
mapper owns the
+    // table-column to file-column conversion, so it also owns the file-local 
block positions.
     file_request->predicate_columns.clear();
     file_request->non_predicate_columns.clear();
     file_request->column_positions.clear();
@@ -141,15 +187,29 @@ Status TableColumnMapper::localize_filters(const 
std::map<int32_t, TableFilter>&
         if (!it.second.can_be_localized()) {
             // TODO: Rewrite table filter to reader_expression_map
             // 
file_request->reader_expression_map.emplace_back(mapping->table_column_id, 
it.second.conjunct);
-        } else {
-            FileLocalFilter local_filter;
-            local_filter.file_column_id = *mapping->file_column_id;
-            local_filter.conjunct = it.second.conjunct;
-            local_filter.predicates = it.second.predicates;
-            file_request->local_filters.push_back(std::move(local_filter));
+            continue;
         }
         add_scan_column(file_request, *mapping->file_column_id, 
&file_request->predicate_columns);
     }
+
+    // Build the complete table-slot to file-block position map after all 
predicate columns have
+    // been assigned. This keeps expression localization independent from 
filter iteration order.
+    const auto table_column_to_file_position = 
build_file_position_map(_mappings, *file_request);
+    for (const auto& it : table_filters) {
+        const auto* mapping = _find_mapping(it.first);
+        if (mapping == nullptr || !mapping->file_column_id.has_value() ||
+            !it.second.can_be_localized()) {
+            continue;
+        }
+        FileLocalFilter local_filter;
+        local_filter.file_column_id = *mapping->file_column_id;
+        if (it.second.conjunct != nullptr) {
+            local_filter.conjunct = 
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
+                    it.second.conjunct->root(), 
table_column_to_file_position));
+        }
+        local_filter.predicates = it.second.predicates;
+        file_request->local_filters.push_back(std::move(local_filter));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/format/reader/table_reader.cpp 
b/be/src/format/reader/table_reader.cpp
index 13f093228e6..f6cfa21600e 100644
--- a/be/src/format/reader/table_reader.cpp
+++ b/be/src/format/reader/table_reader.cpp
@@ -20,15 +20,54 @@
 #include <gen_cpp/PlanNodes_types.h>
 #include <gen_cpp/Types_types.h>
 
+#include <set>
 #include <vector>
 
 #include "common/status.h"
+#include "core/assert_cast.h"
+#include "exprs/vslot_ref.h"
 #include "format/new_parquet/parquet_reader.h"
 #include "format/reader/column_mapper.h"
 #include "format/table/deletion_vector_reader.h"
 #include "io/io_common.h"
 
 namespace doris::reader {
+namespace {
+
+void collect_table_slot_ids(const VExprSPtr& expr, std::set<int>* slot_ids) {
+    if (expr == nullptr) {
+        return;
+    }
+    if (expr->is_slot_ref()) {
+        const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
+        slot_ids->insert(slot_ref->slot_id());
+    }
+    for (const auto& child : expr->children()) {
+        collect_table_slot_ids(child, slot_ids);
+    }
+}
+
+void build_table_filters_from_conjunct(const VExprSPtr& conjunct,
+                                       std::map<int32_t, TableFilter>* 
table_filters) {
+    if (conjunct == nullptr) {
+        return;
+    }
+    std::set<int> slot_ids;
+    collect_table_slot_ids(conjunct, &slot_ids);
+    if (slot_ids.size() == 1) {
+        (*table_filters)[*slot_ids.begin()].conjunct = 
VExprContext::create_shared(conjunct);
+        return;
+    }
+    if (conjunct->node_type() == TExprNodeType::COMPOUND_PRED &&
+        conjunct->op() == TExprOpcode::COMPOUND_AND) {
+        for (const auto& child : conjunct->children()) {
+            build_table_filters_from_conjunct(child, table_filters);
+        }
+        return;
+    }
+}
+
+} // namespace
 
 std::shared_ptr<io::FileSystemProperties> create_system_properties(
         const TFileScanRangeParams* scan_params) {
@@ -58,9 +97,27 @@ Status TableReader::init(TableReadOptions options) {
     _profile = std::move(options.profile);
     TableColumnMapperOptions mapper_options;
     mapper_options.mode = TableColumnMappingMode::BY_FIELD_ID;
+    mapper_options.allow_missing_columns = options.allow_missing_columns;
     _data_reader.column_mapper = TableColumnMapper(mapper_options);
-    // TODO:
-    // _table_filters = build_table_filters_from_conjuncts(options.conjuncts);
+    _conjuncts = std::move(options.conjuncts);
+    return Status::OK();
+}
+
+Status TableReader::_build_table_filters_from_conjuncts() {
+    _table_filters.clear();
+    build_table_filters_from_conjunct(_conjuncts.root(), &_table_filters);
+    return Status::OK();
+}
+
+Status TableReader::_open_local_filter_exprs(const FileScanRequest& 
file_request) {
+    RowDescriptor row_desc;
+    for (const auto& local_filter : file_request.local_filters) {
+        if (local_filter.conjunct == nullptr) {
+            continue;
+        }
+        RETURN_IF_ERROR(local_filter.conjunct->prepare(_runtime_state, 
row_desc));
+        RETURN_IF_ERROR(local_filter.conjunct->open(_runtime_state));
+    }
     return Status::OK();
 }
 
diff --git a/be/src/format/reader/table_reader.h 
b/be/src/format/reader/table_reader.h
index 53791747faf..4f28c4e1aaa 100644
--- a/be/src/format/reader/table_reader.h
+++ b/be/src/format/reader/table_reader.h
@@ -107,6 +107,7 @@ struct TableReadOptions {
     std::shared_ptr<io::IOContext> io_ctx;
     RuntimeState* runtime_state;
     RuntimeProfile* scanner_profile;
+    const bool allow_missing_columns = true;
 
     std::unique_ptr<ReadProfile> profile;
 };
@@ -219,10 +220,12 @@ protected:
         
RETURN_IF_ERROR(_data_reader.column_mapper.create_mapping(_projected_columns,
                                                                   
_partition_values, file_schema));
         DORIS_CHECK(_data_reader.column_mapper.mappings().size() == 
_projected_columns.size());
+        RETURN_IF_ERROR(_build_table_filters_from_conjuncts());
 
         auto file_request = std::make_unique<FileScanRequest>();
         RETURN_IF_ERROR(_data_reader.column_mapper.create_scan_request(
                 _table_filters, _projected_columns, file_request.get()));
+        RETURN_IF_ERROR(_open_local_filter_exprs(*file_request));
         _data_reader.scan_schema.clear();
         _data_reader.block_template.clear();
         _data_reader.scan_schema.resize(file_request->column_positions.size());
@@ -242,12 +245,16 @@ protected:
         return Status::OK();
     }
 
+    Status _build_table_filters_from_conjuncts();
+    Status _open_local_filter_exprs(const FileScanRequest& file_request);
+
     // 关闭当前具体 reader。
     // 该 hook 会被 create_next_reader 和 close 调用;实现应保持幂等。
     virtual Status close_current_reader() {
         RETURN_IF_ERROR(_data_reader.reader->close());
         _data_reader.reader.reset();
         _data_reader.column_mapper.clear();
+        _table_filters.clear();
         _data_reader.block_schema.clear();
         _data_reader.scan_schema.clear();
         _data_reader.block_template.clear();
@@ -314,6 +321,7 @@ protected:
     // partition key -> value
     std::map<std::string, Field> _partition_values;
     std::map<int32_t, TableFilter> _table_filters;
+    VExprContext _conjuncts {nullptr};
     std::unique_ptr<ReadProfile> _profile;
     // Parsed from DELETION_VECTOR in Iceberg and Paimon
     DeleteRows* _delete_rows;
diff --git a/be/test/format/reader/table_reader_test.cpp 
b/be/test/format/reader/table_reader_test.cpp
index 84c5700fc4c..dc2e26f35ea 100644
--- a/be/test/format/reader/table_reader_test.cpp
+++ b/be/test/format/reader/table_reader_test.cpp
@@ -33,12 +33,48 @@
 #include "core/column/column_vector.h"
 #include "core/data_type/data_type_number.h"
 #include "core/data_type/data_type_string.h"
+#include "exprs/vexpr.h"
+#include "format/reader/expr/slot_ref.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/runtime_state.h"
 
 namespace doris::reader {
 namespace {
 
+class TableInt32GreaterThanExpr final : public VExpr {
+public:
+    TableInt32GreaterThanExpr(int slot_id, int column_id, int32_t value)
+            : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {
+        add_child(TableSlotRef::create_shared(slot_id, column_id, -1,
+                                              
std::make_shared<DataTypeInt32>(), "id"));
+        set_node_type(TExprNodeType::BINARY_PRED);
+        _opcode = TExprOpcode::GT;
+    }
+
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        const auto* slot_ref = assert_cast<const 
VSlotRef*>(get_child(0).get());
+        const auto& input =
+                assert_cast<const ColumnInt32&>(
+                        *block->get_by_position(slot_ref->column_id()).column);
+        auto result = ColumnUInt8::create();
+        auto& result_data = result->get_data();
+        result_data.resize(count);
+        for (size_t row = 0; row < count; ++row) {
+            const size_t input_row = selector == nullptr ? row : 
(*selector)[row];
+            result_data[row] = input.get_element(input_row) > _value;
+        }
+        result_column = std::move(result);
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    const int32_t _value;
+    const std::string _expr_name = "TableInt32GreaterThanExpr";
+};
+
 std::shared_ptr<arrow::Array> finish_array(arrow::ArrayBuilder* builder) {
     std::shared_ptr<arrow::Array> array;
     EXPECT_TRUE(builder->Finish(&array).ok());
@@ -97,6 +133,14 @@ SplitReadOptions build_split_options(const std::string& 
file_path) {
     return options;
 }
 
+TableColumn make_table_column(ColumnId id, const std::string& name, const 
DataTypePtr& type) {
+    TableColumn column;
+    column.id = id;
+    column.name = name;
+    column.type = type;
+    return column;
+}
+
 TEST(TableReaderTest, ReopenSplitAfterClose) {
     const auto test_dir = std::filesystem::temp_directory_path() / 
"doris_table_reader_test";
     std::filesystem::remove_all(test_dir);
@@ -112,21 +156,23 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
     write_parquet_file(file_paths[2], 3, "three");
 
     std::vector<TableColumn> projected_columns;
-    projected_columns.push_back({.id = 0, .name = "id", .type = 
std::make_shared<DataTypeInt32>()});
-    projected_columns.push_back(
-            {.id = 1, .name = "value", .type = 
std::make_shared<DataTypeString>()});
+    projected_columns.push_back(make_table_column(1, "value", 
std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
     ASSERT_TRUE(reader
                         .init({
                                 .projected_columns = projected_columns,
-                                .conjuncts = VExprContext(nullptr),
+                                .conjuncts = VExprContext(
+                                        
std::make_shared<TableInt32GreaterThanExpr>(0, 0, 0)),
                                 .format = FileFormat::PARQUET,
                                 .scan_params = nullptr,
                                 .io_ctx = nullptr,
                                 .runtime_state = &state,
                                 .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
                         })
                         .ok());
 
@@ -134,6 +180,9 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
     // init() once, then repeat prepare_split() -> get_block() -> close().
     // This verifies TableReader::close() fully releases the previous 
low-level reader and task
     // state, so a later prepare_split() can open and read a new split on the 
same TableReader.
+    // The table-level conjunct is also rebuilt for each split. The projection 
order puts value
+    // before id, so the pushed conjunct has to be rewritten to the 
ParquetReader file-local block
+    // position every time a new split is opened.
     std::vector<int32_t> ids;
     std::vector<std::string> values;
     for (const auto& file_path : file_paths) {
@@ -145,9 +194,9 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
         ASSERT_TRUE(reader.get_block(&block, &eos).ok());
         ASSERT_FALSE(eos);
 
-        const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
         const auto& value_column =
-                assert_cast<const 
ColumnString&>(*block.get_by_position(1).column);
+                assert_cast<const 
ColumnString&>(*block.get_by_position(0).column);
+        const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
         ASSERT_EQ(id_column.size(), 1);
         ASSERT_EQ(value_column.size(), 1);
         ids.push_back(id_column.get_element(0));
@@ -162,7 +211,81 @@ TEST(TableReaderTest, ReopenSplitAfterClose) {
     std::filesystem::remove_all(test_dir);
 }
 
-TEST(TableReaderTest, ProjectedColumnsRejectParquetSchemaMismatch) {
+TEST(TableReaderTest, OpenReaderBuildsTableFiltersFromConjuncts) {
+    const auto test_dir =
+            std::filesystem::temp_directory_path() / 
"doris_table_reader_conjunct_filter_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_parquet_file(file_path, 3, "three");
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(make_table_column(1, "value", 
std::make_shared<DataTypeString>()));
+    projected_columns.push_back(make_table_column(0, "id", 
std::make_shared<DataTypeInt32>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader
+                        .init({
+                                .projected_columns = projected_columns,
+                                .conjuncts = 
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
+                                        0, 0, 2)),
+                                .format = FileFormat::PARQUET,
+                                .scan_params = nullptr,
+                                .io_ctx = nullptr,
+                                .runtime_state = &state,
+                                .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
+                        })
+                        .ok());
+
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    // open_reader() should convert the table-level conjunct on projected 
column id 0 into
+    // _table_filters before ColumnMapper creates the FileScanRequest. 
ColumnMapper then rewrites
+    // the conjunct's slot ref from table column id 0 to the file-local block 
position used by
+    // ParquetReader. The projection order intentionally puts value before id, 
so the id filter
+    // column is not at position 0 in the file block.
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(1).column);
+    ASSERT_EQ(id_column.size(), 1);
+    EXPECT_EQ(id_column.get_element(0), 3);
+
+    ASSERT_TRUE(reader.close().ok());
+
+    TableReader filtered_reader;
+    ASSERT_TRUE(filtered_reader
+                        .init({
+                                .projected_columns = projected_columns,
+                                .conjuncts = 
VExprContext(std::make_shared<TableInt32GreaterThanExpr>(
+                                        0, 0, 4)),
+                                .format = FileFormat::PARQUET,
+                                .scan_params = nullptr,
+                                .io_ctx = nullptr,
+                                .runtime_state = &state,
+                                .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
+                        })
+                        .ok());
+    
ASSERT_TRUE(filtered_reader.prepare_split(build_split_options(file_path)).ok());
+
+    block = build_table_block(projected_columns);
+    eos = false;
+    ASSERT_TRUE(filtered_reader.get_block(&block, &eos).ok());
+    EXPECT_TRUE(eos);
+    EXPECT_EQ(block.get_by_position(1).column->size(), 0);
+
+    ASSERT_TRUE(filtered_reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, ProjectedColumnsFillDefaultForParquetSchemaMismatch) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_schema_mismatch_test";
     std::filesystem::remove_all(test_dir);
@@ -173,7 +296,7 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatch) {
 
     std::vector<TableColumn> projected_columns;
     projected_columns.push_back(
-            {.id = 99, .name = "missing_value", .type = 
std::make_shared<DataTypeString>()});
+            make_table_column(99, "missing_value", 
std::make_shared<DataTypeString>()));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -186,14 +309,59 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatch) {
                                 .io_ctx = nullptr,
                                 .runtime_state = &state,
                                 .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
                         })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
 
     // The table projection asks for field id 99, but the ParquetReader 
exposes only file-local
-    // fields 0 and 1. get_block() opens the split lazily, so this is where 
TableReader must reject
-    // the mismatch between TableReadOptions::projected_columns and the 
Parquet file schema.
+    // fields 0 and 1. Missing columns are allowed by the current mapper 
options, so TableReader
+    // should still use the Parquet row count and fill a default column in 
table schema.
+    Block block = build_table_block(projected_columns);
+    bool eos = false;
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+    EXPECT_EQ(block.get_by_position(0).column->size(), 1);
+
+    ASSERT_TRUE(reader.close().ok());
+    std::filesystem::remove_all(test_dir);
+}
+
+TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatchWhenMissingColumnsDisallowed) {
+    const auto test_dir = std::filesystem::temp_directory_path() /
+                          "doris_table_reader_schema_mismatch_reject_test";
+    std::filesystem::remove_all(test_dir);
+    std::filesystem::create_directories(test_dir);
+
+    const auto file_path = (test_dir / "split.parquet").string();
+    write_parquet_file(file_path, 1, "one");
+
+    std::vector<TableColumn> projected_columns;
+    projected_columns.push_back(
+            make_table_column(99, "missing_value", 
std::make_shared<DataTypeString>()));
+
+    RuntimeState state {TQueryOptions(), TQueryGlobals()};
+    TableReader reader;
+    ASSERT_TRUE(reader
+                        .init({
+                                .projected_columns = projected_columns,
+                                .conjuncts = VExprContext(nullptr),
+                                .format = FileFormat::PARQUET,
+                                .scan_params = nullptr,
+                                .io_ctx = nullptr,
+                                .runtime_state = &state,
+                                .scanner_profile = nullptr,
+                                .allow_missing_columns = false,
+                                .profile = nullptr,
+                        })
+                        .ok());
+
+    ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
+
+    // With allow_missing_columns disabled, the same missing projected column 
should fail while
+    // opening the split instead of being materialized as a default column.
     Block block = build_table_block(projected_columns);
     bool eos = false;
     const auto status = reader.get_block(&block, &eos);
@@ -204,7 +372,7 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectParquetSchemaMismatch) {
     std::filesystem::remove_all(test_dir);
 }
 
-TEST(TableReaderTest, 
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMismatch) {
+TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionForSameNameDifferentIdParquetSchema) {
     const auto test_dir =
             std::filesystem::temp_directory_path() / 
"doris_table_reader_same_name_diff_id_test";
     std::filesystem::remove_all(test_dir);
@@ -214,8 +382,7 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMism
     write_parquet_file(file_path, 1, "one");
 
     std::vector<TableColumn> projected_columns;
-    projected_columns.push_back(
-            {.id = 99, .name = "id", .type = 
std::make_shared<DataTypeInt32>()});
+    projected_columns.push_back(make_table_column(99, "id", 
std::make_shared<DataTypeInt32>()));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -228,19 +395,24 @@ TEST(TableReaderTest, 
ProjectedColumnsRejectSameNameDifferentIdParquetSchemaMism
                                 .io_ctx = nullptr,
                                 .runtime_state = &state,
                                 .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
                         })
                         .ok());
 
     ASSERT_TRUE(reader.prepare_split(build_split_options(file_path)).ok());
 
     // The table column has the same name as the Parquet field, but a 
different field id.
-    // TableReader configures ColumnMapper in BY_FIELD_ID mode, so the name 
match must not hide
-    // the id mismatch.
+    // ColumnMapper should still resolve it by name and build a SlotRef 
projection from the file
+    // column into the requested table column.
     Block block = build_table_block(projected_columns);
     bool eos = false;
-    const auto status = reader.get_block(&block, &eos);
-    ASSERT_FALSE(status.ok());
-    EXPECT_NE(status.to_string().find("does not have a matching file column"), 
std::string::npos);
+    ASSERT_TRUE(reader.get_block(&block, &eos).ok());
+    ASSERT_FALSE(eos);
+
+    const auto& id_column = assert_cast<const 
ColumnInt32&>(*block.get_by_position(0).column);
+    ASSERT_EQ(id_column.size(), 1);
+    EXPECT_EQ(id_column.get_element(0), 1);
 
     ASSERT_TRUE(reader.close().ok());
     std::filesystem::remove_all(test_dir);
@@ -257,9 +429,9 @@ TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
 
     std::vector<TableColumn> projected_columns;
     projected_columns.push_back(
-            {.id = 0, .name = "table_id", .type = 
std::make_shared<DataTypeInt64>()});
+            make_table_column(0, "table_id", 
std::make_shared<DataTypeInt64>()));
     projected_columns.push_back(
-            {.id = 1, .name = "table_value", .type = 
std::make_shared<DataTypeString>()});
+            make_table_column(1, "table_value", 
std::make_shared<DataTypeString>()));
 
     RuntimeState state {TQueryOptions(), TQueryGlobals()};
     TableReader reader;
@@ -272,6 +444,8 @@ TEST(TableReaderTest, 
ProjectedColumnsUseMapperExpressionsForParquetSchemaMismat
                                 .io_ctx = nullptr,
                                 .runtime_state = &state,
                                 .scanner_profile = nullptr,
+                                .allow_missing_columns = true,
+                                .profile = nullptr,
                         })
                         .ok());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to