This is an automated email from the ASF dual-hosted git repository.

BiteTheDDDDt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d3bd8a037e [fix](be) Rebind storage common expr slots (#63279)
8d3bd8a037e is described below

commit 8d3bd8a037e9ab00875857e58f9619be4f83c7ef
Author: Pxl <[email protected]>
AuthorDate: Tue May 19 12:45:58 2026 +0800

    [fix](be) Rebind storage common expr slots (#63279)
    
    Problem Summary: #62222 allowed key-only common expressions to be pushed
    down for aggregation/merge readers. Those expressions are prepared with
    scan tuple column positions, but SegmentIterator evaluates them on
    storage reader blocks whose schema can be expanded for aggregation/merge
    readers. This can make slot refs read the wrong column and trigger type
    mismatches such as BIGINT vs INT in eq predicates.
---
 be/src/exprs/virtual_slot_ref.h                    |   2 +-
 be/src/exprs/vslot_ref.h                           |   2 +-
 be/src/storage/segment/segment_iterator.cpp        | 119 +++++++++++++++++++++
 .../predicate_infer/infer_predicate.groovy         |  36 +++++++
 4 files changed, 157 insertions(+), 2 deletions(-)

diff --git a/be/src/exprs/virtual_slot_ref.h b/be/src/exprs/virtual_slot_ref.h
index ce5219a9a7b..721203f7de9 100644
--- a/be/src/exprs/virtual_slot_ref.h
+++ b/be/src/exprs/virtual_slot_ref.h
@@ -40,6 +40,7 @@ public:
     int slot_id() const { return _slot_id; }
     bool equals(const VExpr& other) override;
     size_t estimate_memory(const size_t rows) override { return 0; }
+    void set_column_id(int column_id) { _column_id = column_id; }
     void collect_slot_column_ids(std::set<int>& column_ids) const override {
         column_ids.insert(_column_id);
     }
@@ -111,7 +112,6 @@ public:
 
 #ifdef BE_TEST
     // Test-only setter methods for unit testing
-    void set_column_id(int column_id) { _column_id = column_id; }
     void set_column_name(const std::string* column_name) { _column_name = 
column_name; }
     void set_column_data_type(DataTypePtr column_data_type) {
         _column_data_type = std::move(column_data_type);
diff --git a/be/src/exprs/vslot_ref.h b/be/src/exprs/vslot_ref.h
index 21b5735753b..ef61edc384c 100644
--- a/be/src/exprs/vslot_ref.h
+++ b/be/src/exprs/vslot_ref.h
@@ -39,9 +39,9 @@ public:
     VSlotRef(const SlotDescriptor* desc);
 #ifdef BE_TEST
     VSlotRef() = default;
-    void set_column_id(int column_id) { _column_id = column_id; }
     void set_slot_id(int slot_id) { _slot_id = slot_id; }
 #endif
+    void set_column_id(int column_id) { _column_id = column_id; }
     Status prepare(RuntimeState* state, const RowDescriptor& desc, 
VExprContext* context) override;
     Status open(RuntimeState* state, VExprContext* context,
                 FunctionContext::FunctionStateScope scope) override;
diff --git a/be/src/storage/segment/segment_iterator.cpp 
b/be/src/storage/segment/segment_iterator.cpp
index 9ec76a84778..f3931c72f6d 100644
--- a/be/src/storage/segment/segment_iterator.cpp
+++ b/be/src/storage/segment/segment_iterator.cpp
@@ -69,6 +69,7 @@
 #include "io/cache/cached_remote_file_reader.h"
 #include "io/fs/file_reader.h"
 #include "io/io_common.h"
+#include "runtime/descriptors.h"
 #include "runtime/query_context.h"
 #include "runtime/runtime_predicate.h"
 #include "runtime/runtime_state.h"
@@ -109,11 +110,127 @@
 #include "storage/utils.h"
 #include "util/concurrency_stats.h"
 #include "util/defer_op.h"
+#include "util/json/path_in_data.h"
 #include "util/simd/bits.h"
 
 namespace doris {
 using namespace ErrorCode;
 namespace segment_v2 {
+namespace {
+
+Status tablet_column_id_by_slot(const TabletSchemaSPtr& tablet_schema, const 
SlotDescriptor* slot,
+                                ColumnId* cid) {
+    int32_t field_index = -1;
+    if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) {
+        field_index = tablet_schema->field_index(
+                
PathInData(tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(),
+                           slot->column_paths()));
+    } else {
+        field_index = slot->col_unique_id() >= 0 ? 
tablet_schema->field_index(slot->col_unique_id())
+                                                 : 
tablet_schema->field_index(slot->col_name());
+    }
+    if (field_index < 0) {
+        return Status::InternalError(
+                "field name is invalid. field={}, field_name_to_index={}, 
col_unique_id={}",
+                slot->col_name(), tablet_schema->get_all_field_names(), 
slot->col_unique_id());
+    }
+    *cid = field_index;
+    return Status::OK();
+}
+
+Status rebind_storage_expr_to_reader_schema(
+        const StorageReadOptions& opts, const VExprSPtr& expr,
+        const std::unordered_map<ColumnId, size_t>& cid_to_pos) {
+    DORIS_CHECK(expr != nullptr);
+
+    if (expr->is_slot_ref()) {
+        auto slot_ref = std::static_pointer_cast<VSlotRef>(expr);
+        auto* slot = 
opts.runtime_state->desc_tbl().get_slot_descriptor(slot_ref->slot_id());
+        if (slot == nullptr) {
+            return Status::InternalError("slot {} is not found in descriptor 
table",
+                                         slot_ref->slot_id());
+        }
+
+        ColumnId cid = 0;
+        RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, 
&cid));
+        auto pos_it = cid_to_pos.find(cid);
+        if (pos_it == cid_to_pos.end()) {
+            return Status::InternalError("slot {} column {} with cid {} is not 
in reader schema",
+                                         slot_ref->slot_id(), 
slot->col_name(), cid);
+        }
+        slot_ref->set_column_id(cast_set<int>(pos_it->second));
+    } else if (expr->is_virtual_slot_ref()) {
+        auto virtual_slot_ref = std::static_pointer_cast<VirtualSlotRef>(expr);
+        auto* slot =
+                
opts.runtime_state->desc_tbl().get_slot_descriptor(virtual_slot_ref->slot_id());
+        if (slot == nullptr) {
+            return Status::InternalError("slot {} is not found in descriptor 
table",
+                                         virtual_slot_ref->slot_id());
+        }
+
+        ColumnId cid = 0;
+        RETURN_IF_ERROR(tablet_column_id_by_slot(opts.tablet_schema, slot, 
&cid));
+        auto pos_it = cid_to_pos.find(cid);
+        if (pos_it == cid_to_pos.end()) {
+            return Status::InternalError(
+                    "virtual slot {} column {} with cid {} is not in reader 
schema",
+                    virtual_slot_ref->slot_id(), slot->col_name(), cid);
+        }
+        virtual_slot_ref->set_column_id(cast_set<int>(pos_it->second));
+        // A virtual slot has its own output position in the reader block, and 
its
+        // materialization expression may also contain real slot refs. Rebind 
both
+        // sides so evaluating the virtual expression reads from the same block
+        // layout used by SegmentIterator.
+        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(
+                opts, virtual_slot_ref->get_virtual_column_expr(), 
cid_to_pos));
+    }
+
+    for (const auto& child : expr->children()) {
+        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, child, 
cid_to_pos));
+    }
+    return Status::OK();
+}
+
+Status rebind_storage_exprs_to_reader_schema(const StorageReadOptions& opts, 
const Schema& schema,
+                                             const VExprContextSPtrs& 
common_exprs,
+                                             std::map<ColumnId, 
VExprContextSPtr>& virtual_exprs) {
+    if (common_exprs.empty() && virtual_exprs.empty()) {
+        return Status::OK();
+    }
+    DORIS_CHECK(opts.runtime_state != nullptr);
+    DORIS_CHECK(opts.tablet_schema != nullptr);
+
+    const auto keys_type = opts.tablet_schema->keys_type();
+    if (keys_type == KeysType::DUP_KEYS ||
+        (keys_type == KeysType::UNIQUE_KEYS && 
opts.enable_unique_key_merge_on_write)) {
+        return Status::OK();
+    }
+
+    // Storage exprs are prepared with RowDescriptor, so 
VSlotRef/VirtualSlotRef column_id points to
+    // the scan tuple column ordinal. SegmentIterator evaluates cloned exprs 
on a block built from
+    // the reader schema instead. AGG_KEYS and non-MOW UNIQUE_KEYS readers may 
expand the reader
+    // schema, for example by filling all key columns before 
merging/aggregating rows, so the scan
+    // tuple ordinal is not always the same as the runtime block ordinal.
+    //
+    // DUP_KEYS and UNIQUE_KEYS MOW use direct readers for query scans, so 
their reader block keeps
+    // the scan tuple layout and can skip this per-segment expression-tree 
traversal. For merge/agg
+    // readers, the reader schema is the source of truth: map tablet column id 
to reader-block
+    // position and rebind every storage expr slot to that position.
+    std::unordered_map<ColumnId, size_t> cid_to_pos;
+    for (size_t pos = 0; pos < schema.num_column_ids(); ++pos) {
+        cid_to_pos.emplace(schema.column_id(cast_set<int>(pos)), pos);
+    }
+
+    for (const auto& ctx : common_exprs) {
+        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, 
ctx->root(), cid_to_pos));
+    }
+    for (const auto& [_, ctx] : virtual_exprs) {
+        RETURN_IF_ERROR(rebind_storage_expr_to_reader_schema(opts, 
ctx->root(), cid_to_pos));
+    }
+    return Status::OK();
+}
+
+} // namespace
 
 SegmentIterator::~SegmentIterator() = default;
 
@@ -3310,6 +3427,8 @@ Status 
SegmentIterator::_construct_compound_expr_context() {
         context->set_index_context(inverted_index_context);
         expr_ctx = context;
     }
+    RETURN_IF_ERROR(rebind_storage_exprs_to_reader_schema(
+            _opts, *_schema, _common_expr_ctxs_push_down, 
_virtual_column_exprs));
     return Status::OK();
 }
 
diff --git 
a/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
 
b/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
index 5e4207fa2e0..e2dd89be33f 100644
--- 
a/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
+++ 
b/regression-test/suites/nereids_rules_p0/predicate_infer/infer_predicate.groovy
@@ -307,4 +307,40 @@ suite("infer_predicate") {
     qt_infer11 """
         explain shape plan select * from (select * from t1 where t1.id = 
12345) t1 join t2 on cast(t1.id as largeint) = cast(t2.id as largeint);
     """
+
+    sql "DROP TABLE IF EXISTS infer_predicate_cast_common_expr_agg"
+    sql """
+        CREATE TABLE infer_predicate_cast_common_expr_agg (
+            pk int,
+            col_int int,
+            col_bigint bigint,
+            col_bitmap bitmap bitmap_union
+        )
+        AGGREGATE KEY(pk, col_int, col_bigint)
+        DISTRIBUTED BY HASH(pk) BUCKETS 1
+        PROPERTIES (
+            "replication_num" = "1"
+        )
+    """
+    sql """
+        INSERT INTO infer_predicate_cast_common_expr_agg VALUES
+            (1, 10, 1, to_bitmap(1)),
+            (2, 20, 3, to_bitmap(2)),
+            (3, 30, 3, to_bitmap(3))
+    """
+    test {
+        sql """
+            SELECT pk, col_bigint, bitmap_union_count(col_bitmap)
+            FROM infer_predicate_cast_common_expr_agg
+            WHERE col_bigint = CAST(pk AS BIGINT)
+            GROUP BY pk, col_bigint
+            ORDER BY pk, col_bigint
+        """
+        check { result, exception, startTime, endTime ->
+            if (exception != null) {
+                throw exception
+            }
+            assertEquals("[[1, 1, 1], [3, 3, 1]]", result.toString())
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to