This is an automated email from the ASF dual-hosted git repository.

Gabriel39 pushed a commit to branch refact_reader_branch
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/refact_reader_branch by this 
push:
     new 5ad0921fe02 [fix](be) Cast localized filter slots for file schema 
types (#63754)
5ad0921fe02 is described below

commit 5ad0921fe021d5c939824b95f44b882fe678470d
Author: Gabriel <[email protected]>
AuthorDate: Wed May 27 19:12:12 2026 +0800

    [fix](be) Cast localized filter slots for file schema types (#63754)
---
 be/src/format/reader/column_mapper.cpp   | 53 +++++++++++++------
 be/src/format/reader/file_reader.h       |  2 +-
 be/test/format/reader/expr/cast_test.cpp | 89 ++++++++++++++++++++++++++++++++
 3 files changed, 127 insertions(+), 17 deletions(-)

diff --git a/be/src/format/reader/column_mapper.cpp 
b/be/src/format/reader/column_mapper.cpp
index 1a33781b965..80a81f6c76d 100644
--- a/be/src/format/reader/column_mapper.cpp
+++ b/be/src/format/reader/column_mapper.cpp
@@ -36,18 +36,33 @@
 
 namespace doris::reader {
 
+struct FileSlotRewriteInfo {
+    size_t block_position = 0;
+    DataTypePtr file_type;
+    DataTypePtr table_type;
+    std::string file_column_name;
+};
+
 static VExprSPtr rewrite_table_expr_to_file_expr(
-        const VExprSPtr& expr, const std::map<int32_t, size_t>& 
table_column_to_file_position) {
+        const VExprSPtr& expr,
+        const std::map<int32_t, FileSlotRewriteInfo>& 
table_column_to_file_slot) {
     if (expr == nullptr) {
         return nullptr;
     }
     if (expr->is_slot_ref()) {
         const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
-        const auto position_it = 
table_column_to_file_position.find(slot_ref->slot_id());
-        if (position_it != table_column_to_file_position.end()) {
-            return TableSlotRef::create_shared(slot_ref->slot_id(),
-                                               
cast_set<int>(position_it->second), -1,
-                                               slot_ref->data_type(), 
slot_ref->expr_name());
+        const auto rewrite_it = 
table_column_to_file_slot.find(slot_ref->slot_id());
+        if (rewrite_it != table_column_to_file_slot.end()) {
+            const auto& rewrite_info = rewrite_it->second;
+            auto file_slot = TableSlotRef::create_shared(
+                    slot_ref->slot_id(), 
cast_set<int>(rewrite_info.block_position), -1,
+                    rewrite_info.file_type, rewrite_info.file_column_name);
+            if (rewrite_info.file_type->equals(*rewrite_info.table_type)) {
+                return file_slot;
+            }
+            auto cast_expr = Cast::create_shared(rewrite_info.table_type);
+            cast_expr->add_child(std::move(file_slot));
+            return cast_expr;
         }
         return expr;
     }
@@ -59,7 +74,7 @@ static VExprSPtr rewrite_table_expr_to_file_expr(
     rewritten_children.reserve(expr->children().size());
     for (const auto& child : expr->children()) {
         rewritten_children.push_back(
-                rewrite_table_expr_to_file_expr(child, 
table_column_to_file_position));
+                rewrite_table_expr_to_file_expr(child, 
table_column_to_file_slot));
     }
     expr->set_children(std::move(rewritten_children));
     return expr;
@@ -95,19 +110,25 @@ static void rebuild_projection(ColumnMapping* mapping, 
size_t block_position) {
     mapping->projection = VExprContext::create_shared(expr);
 }
 
-static std::map<int32_t, size_t> build_file_position_map(const 
std::vector<ColumnMapping>& mappings,
-                                                         const 
FileScanRequest& file_request) {
-    std::map<int32_t, size_t> table_column_to_file_position;
+// Build a map from table column id to file slot rewrite info for all columns 
in the given mappings that have a file column id and are present in the file 
request.
+static std::map<int32_t, FileSlotRewriteInfo> build_file_slot_rewrite_map(
+        const std::vector<ColumnMapping>& mappings, const FileScanRequest& 
file_request) {
+    std::map<int32_t, FileSlotRewriteInfo> table_column_to_file_slot;
     for (const auto& mapping : mappings) {
         if (!mapping.file_column_id.has_value()) {
             continue;
         }
         const auto position_it = 
file_request.column_positions.find(*mapping.file_column_id);
         if (position_it != file_request.column_positions.end()) {
-            table_column_to_file_position.emplace(mapping.table_column_id, 
position_it->second);
+            table_column_to_file_slot.emplace(
+                    mapping.table_column_id,
+                    FileSlotRewriteInfo {.block_position = position_it->second,
+                                         .file_type = mapping.file_type,
+                                         .table_type = mapping.table_type,
+                                         .file_column_name = 
mapping.file_column_name});
         }
     }
-    return table_column_to_file_position;
+    return table_column_to_file_slot;
 }
 
 static bool is_complex_type(const DataTypePtr& type) {
@@ -348,9 +369,9 @@ Status TableColumnMapper::localize_filters(const 
std::vector<TableFilter>& table
         add_scan_column(file_request, *mapping->file_column_id, 
&file_request->predicate_columns);
     }
 
-    // Build the complete table-slot to file-block position map after all 
predicate columns have
-    // been assigned. This keeps expression localization independent from 
filter iteration order.
-    const auto table_column_to_file_position = 
build_file_position_map(_mappings, *file_request);
+    // Build the complete table-slot rewrite map after all predicate columns 
have been assigned.
+    // This keeps expression localization independent from filter iteration 
order.
+    const auto table_column_to_file_slot = 
build_file_slot_rewrite_map(_mappings, *file_request);
     for (const auto& table_filter : table_filters) {
         if (!table_filter.can_be_localized()) {
             continue;
@@ -359,7 +380,7 @@ Status TableColumnMapper::localize_filters(const 
std::vector<TableFilter>& table
             FileExpressionFilter expression_filter;
             expression_filter.conjunct =
                     
VExprContext::create_shared(rewrite_table_expr_to_file_expr(
-                            table_filter.conjunct->root(), 
table_column_to_file_position));
+                            table_filter.conjunct->root(), 
table_column_to_file_slot));
             
expression_filter.file_column_ids.reserve(table_filter.slot_ids.size());
             for (const auto table_column_id : table_filter.slot_ids) {
                 const auto* mapping = _find_mapping(table_column_id);
diff --git a/be/src/format/reader/file_reader.h 
b/be/src/format/reader/file_reader.h
index 69720bc8f9a..28de8f068b0 100644
--- a/be/src/format/reader/file_reader.h
+++ b/be/src/format/reader/file_reader.h
@@ -106,7 +106,7 @@ struct FileScanRequest {
 
     std::vector<ColumnId> predicate_columns;
     std::vector<ColumnId> non_predicate_columns;
-    std::map<ColumnId, size_t> column_positions;
+    std::map<ColumnId, size_t> column_positions; // file_column_id -> 
file-local block position
     std::map<ColumnId, FieldProjection> complex_projections;
     std::vector<FileExpressionFilter> expression_filters;
     std::vector<FileColumnPredicateFilter> column_predicate_filters;
diff --git a/be/test/format/reader/expr/cast_test.cpp 
b/be/test/format/reader/expr/cast_test.cpp
index 4f215418953..cab4e6c5b0d 100644
--- a/be/test/format/reader/expr/cast_test.cpp
+++ b/be/test/format/reader/expr/cast_test.cpp
@@ -64,6 +64,33 @@ protected:
     MockRuntimeState state;
 };
 
+class Int64ChildGreaterThanExpr final : public VExpr {
+public:
+    explicit Int64ChildGreaterThanExpr(int64_t value)
+            : VExpr(std::make_shared<DataTypeUInt8>(), false), _value(value) {}
+
+    Status execute_column_impl(VExprContext* context, const Block* block, 
const Selector* selector,
+                               size_t count, ColumnPtr& result_column) const 
override {
+        ColumnPtr child_column;
+        RETURN_IF_ERROR(get_child(0)->execute_column(context, block, selector, 
count, child_column));
+        const auto& input = assert_cast<const ColumnInt64&>(*child_column);
+        auto result = ColumnUInt8::create();
+        auto& result_data = result->get_data();
+        result_data.resize(count);
+        for (size_t row = 0; row < count; ++row) {
+            result_data[row] = input.get_element(row) > _value;
+        }
+        result_column = std::move(result);
+        return Status::OK();
+    }
+
+    const std::string& expr_name() const override { return _expr_name; }
+
+private:
+    const int64_t _value;
+    const std::string _expr_name = "Int64ChildGreaterThanExpr";
+};
+
 TEST_F(CastTest, CastIntSlotToBigInt) {
     auto source_type = std::make_shared<DataTypeInt32>();
     auto return_type = std::make_shared<DataTypeInt64>();
@@ -189,6 +216,9 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastProjectionForTypeMismatch) {
     auto status = mapper.create_mapping(projected_columns, {}, file_schema);
     ASSERT_TRUE(status.ok()) << status;
     ASSERT_EQ(mapper.mappings().size(), 1);
+    reader::FileScanRequest file_request;
+    status = mapper.create_scan_request({}, {}, projected_columns, 
&file_request);
+    ASSERT_TRUE(status.ok()) << status;
     const auto& mapping = mapper.mappings()[0];
     EXPECT_FALSE(mapping.is_trivial);
     ASSERT_NE(mapping.projection, nullptr);
@@ -207,4 +237,63 @@ TEST_F(CastTest, 
ColumnMapperBuildsCastProjectionForTypeMismatch) {
     mapping.projection->close();
 }
 
+TEST_F(CastTest, ColumnMapperBuildsCastFilterForTypeMismatch) {
+    reader::TableColumnMapper mapper;
+    reader::TableColumn table_column;
+    table_column.id = 7;
+    table_column.name = "value";
+    table_column.type = std::make_shared<DataTypeInt64>();
+    std::vector<reader::TableColumn> projected_columns {table_column};
+
+    reader::SchemaField file_field;
+    file_field.id = 0;
+    file_field.name = "value";
+    file_field.type = std::make_shared<DataTypeInt32>();
+    std::vector<reader::SchemaField> file_schema {file_field};
+
+    auto status = mapper.create_mapping(projected_columns, {}, file_schema);
+    ASSERT_TRUE(status.ok()) << status;
+
+    auto predicate = std::make_shared<Int64ChildGreaterThanExpr>(15);
+    predicate->add_child(TableSlotRef::create_shared(7, 7, -1, 
table_column.type, "value"));
+    reader::TableFilter table_filter;
+    table_filter.conjunct = VExprContext::create_shared(predicate);
+    table_filter.slot_ids = {7};
+
+    reader::FileScanRequest file_request;
+    ASSERT_TRUE(mapper.create_scan_request({table_filter}, {}, 
projected_columns, &file_request)
+                        .ok());
+    ASSERT_EQ(file_request.expression_filters.size(), 1);
+    ASSERT_EQ(file_request.predicate_columns, 
std::vector<reader::ColumnId>({0}));
+    const auto& localized_expr = 
file_request.expression_filters[0].conjunct->root();
+    ASSERT_EQ(localized_expr->get_num_children(), 1);
+    const auto& localized_child = localized_expr->children()[0];
+    ASSERT_NE(dynamic_cast<const Cast*>(localized_child.get()), nullptr);
+    ASSERT_EQ(localized_child->get_num_children(), 1);
+    const auto* localized_slot =
+            assert_cast<const 
TableSlotRef*>(localized_child->children()[0].get());
+    EXPECT_EQ(localized_slot->column_id(), 0);
+    EXPECT_TRUE(localized_slot->data_type()->equals(*file_field.type));
+    EXPECT_TRUE(localized_child->data_type()->equals(*table_column.type));
+
+    Block block;
+    block.insert(ColumnHelper::create_column_with_name<DataTypeInt32>({11, 
22}));
+    auto* conjunct = file_request.expression_filters[0].conjunct.get();
+    status = conjunct->prepare(&state, RowDescriptor());
+    ASSERT_TRUE(status.ok()) << status;
+    status = conjunct->open(&state);
+    ASSERT_TRUE(status.ok()) << status;
+    IColumn::Filter filter(block.rows(), 1);
+    bool can_filter_all = false;
+    status = conjunct->execute_filter(&block, filter.data(), block.rows(), 
false,
+                                      &can_filter_all);
+    ASSERT_TRUE(status.ok()) << status;
+    EXPECT_FALSE(can_filter_all);
+    ASSERT_EQ(filter.size(), 2);
+    EXPECT_EQ(filter[0], 0);
+    EXPECT_EQ(filter[1], 1);
+
+    file_request.expression_filters[0].conjunct->close();
+}
+
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to