This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a450a2d778 [Opt](exec) opt variant column in join op  (#49049)
1a450a2d778 is described below

commit 1a450a2d77831aa3270916d86dd2d0491c7b3de3
Author: HappenLee <[email protected]>
AuthorDate: Fri Mar 14 16:50:34 2025 +0800

    [Opt](exec) opt variant column in join op  (#49049)
    
    Refactor Column Object insert_indice_from to speed up join op
---
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |  15 ++-
 be/src/pipeline/exec/hashjoin_build_sink.h         |   3 +
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |   9 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   3 +
 .../exec/join/process_hash_table_probe_impl.h      |   4 +
 be/src/vec/columns/column_const.h                  |   2 +
 be/src/vec/columns/column_nullable.h               |   2 +
 be/src/vec/columns/column_object.cpp               |  24 +++-
 be/src/vec/core/block.h                            |   6 -
 be/test/vec/columns/column_object_test.cpp         | 127 +++++++++++++++++++++
 10 files changed, 181 insertions(+), 14 deletions(-)

diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e2b911a583b..61d6e0f30fb 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -410,11 +410,16 @@ Status 
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
     LOG(INFO) << "build block rows: " << block.rows() << ", columns count: " 
<< block.columns()
               << ", bytes/allocated_bytes: " << 
PrettyPrinter::print_bytes(block.bytes()) << "/"
               << PrettyPrinter::print_bytes(block.allocated_bytes());
-
-    block.replace_if_overflow();
+    // 1. Dispose the overflow of ColumnString
+    // 2. Finalize the ColumnObject to speed up
+    for (auto& data : block) {
+        data.column = 
std::move(*data.column).mutate()->convert_column_if_overflow();
+        if (p._need_finalize_variant_column) {
+            std::move(*data.column).mutate()->finalize();
+        }
+    }
 
     vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
-
     vectorized::ColumnUInt8::MutablePtr null_map_val;
     if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op == 
TJoinOp::FULL_OUTER_JOIN) {
         _convert_block_to_null(block);
@@ -587,9 +592,11 @@ Status HashJoinBuildSinkOperatorX::prepare(RuntimeState* 
state) {
         for (const auto& tuple_desc : tuple_descs) {
             for (const auto& slot_desc : tuple_desc->slots()) {
                 output_slot_flags.emplace_back(
-                        _hash_output_slot_ids.empty() ||
                         std::find(_hash_output_slot_ids.begin(), 
_hash_output_slot_ids.end(),
                                   slot_desc->id()) != 
_hash_output_slot_ids.end());
+                if (output_slot_flags.back() && 
slot_desc->type().is_variant_type()) {
+                    _need_finalize_variant_column = true;
+                }
             }
         }
     };
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 81100bdbfae..cc5cdf99ab2 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -180,6 +180,9 @@ private:
     std::vector<SlotId> _hash_output_slot_ids;
     std::vector<bool> _should_keep_column_flags;
     bool _should_keep_hash_key_column = false;
+    // if build side has variant column and need output variant column
+    // need to finalize variant column to speed up the join op
+    bool _need_finalize_variant_column = false;
 };
 
 template <class HashTableContext>
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index a55379a02ad..7f273680f6d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -500,16 +500,21 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState* 
state) {
     // _left_output_slots_flags : column of left table need to output set flag 
= true
     // _rgiht_output_slots_flags : column of right table need to output set 
flag = true
     // if _hash_output_slot_ids is empty, means all column of left/right table 
need to output.
-    auto init_output_slots_flags = [&](auto& tuple_descs, auto& 
output_slot_flags) {
+    auto init_output_slots_flags = [&](auto& tuple_descs, auto& 
output_slot_flags,
+                                       bool init_finalize_flag = false) {
         for (const auto& tuple_desc : tuple_descs) {
             for (const auto& slot_desc : tuple_desc->slots()) {
                 output_slot_flags.emplace_back(
                         std::find(_hash_output_slot_ids.begin(), 
_hash_output_slot_ids.end(),
                                   slot_desc->id()) != 
_hash_output_slot_ids.end());
+                if (init_finalize_flag && output_slot_flags.back() &&
+                    slot_desc->type().is_variant_type()) {
+                    _need_finalize_variant_column = true;
+                }
             }
         }
     };
-    init_output_slots_flags(_child->row_desc().tuple_descriptors(), 
_left_output_slot_flags);
+    init_output_slots_flags(_child->row_desc().tuple_descriptors(), 
_left_output_slot_flags, true);
     init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
                             _right_output_slot_flags);
     // _other_join_conjuncts are evaluated in the context of the rows produced 
by this node
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b9774b5bb0d..f3f1983975e 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -159,6 +159,8 @@ public:
                _join_distribution != TJoinDistributionType::NONE;
     }
 
+    bool need_finalize_variant_column() const { return 
_need_finalize_variant_column; }
+
 private:
     Status _do_evaluate(vectorized::Block& block, 
vectorized::VExprContextSPtrs& exprs,
                         RuntimeProfile::Counter& expr_call_timer,
@@ -181,6 +183,7 @@ private:
     std::vector<SlotId> _hash_output_slot_ids;
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
+    bool _need_finalize_variant_column = false;
     std::vector<std::string> _right_table_column_names;
     const std::vector<TExpr> _partition_exprs;
 };
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h 
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 11a70c5d5f7..b06e7c7c388 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -122,6 +122,10 @@ void 
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
     auto& probe_block = _parent->_probe_block;
     for (int i = 0; i < output_slot_flags.size(); ++i) {
         if (output_slot_flags[i]) {
+            if (auto& p = _parent->parent()->cast<HashJoinProbeOperatorX>();
+                p.need_finalize_variant_column()) {
+                
std::move(*probe_block.get_by_position(i).column).mutate()->finalize();
+            }
             auto& column = probe_block.get_by_position(i).column;
             if (all_match_one) {
                 mcol[i]->insert_range_from(*column, _probe_indexs[0], size);
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index 8015e7e680c..dcb3c8111b3 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -274,6 +274,8 @@ public:
         DCHECK(size() > self_row);
         data->replace_column_data(rhs, row, self_row);
     }
+
+    void finalize() override { data->finalize(); }
 };
 } // namespace doris::vectorized
 #include "common/compile_check_end.h"
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index 63ec3225bfd..58599a51f22 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -436,6 +436,8 @@ public:
         return nested_column->get_rowset_segment_id();
     }
 
+    void finalize() override { get_nested_column().finalize(); }
+
 private:
     void _update_has_null();
 
diff --git a/be/src/vec/columns/column_object.cpp 
b/be/src/vec/columns/column_object.cpp
index 6b5cd9db56d..dccacd6b813 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -1905,9 +1905,29 @@ Status ColumnObject::extract_root(const PathInData& 
path, MutableColumnPtr& dst)
 
 void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t* 
indices_begin,
                                        const uint32_t* indices_end) {
-    for (const auto* x = indices_begin; x != indices_end; ++x) {
-        ColumnObject::insert_from(src, *x);
+    // optimize when src and this column are scalar variant, since try_insert 
is inefficiency
+    const auto* src_v = check_and_get_column<ColumnObject>(src);
+
+    bool src_can_do_quick_insert =
+            src_v != nullptr && src_v->is_scalar_variant() && 
src_v->is_finalized();
+    // num_rows == 0 means this column is empty, we not need to check it type
+    if (num_rows == 0 && src_can_do_quick_insert) {
+        // add a new root column, and insert from src root column
+        clear();
+        add_sub_column({}, src_v->get_root()->clone_empty(), 
src_v->get_root_type());
+
+        get_root()->insert_indices_from(*src_v->get_root(), indices_begin, 
indices_end);
+        num_rows += indices_end - indices_begin;
+    } else if (src_can_do_quick_insert && is_scalar_variant() &&
+               src_v->get_root_type()->equals(*get_root_type())) {
+        get_root()->insert_indices_from(*src_v->get_root(), indices_begin, 
indices_end);
+        num_rows += indices_end - indices_begin;
+    } else {
+        for (const auto* x = indices_begin; x != indices_end; ++x) {
+            try_insert(src[*x]);
+        }
     }
+    finalize();
 }
 
 void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback 
callback) const {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 74a953350d5..59ad41d9ba7 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -149,12 +149,6 @@ public:
         element.column = element.column->convert_to_full_column_if_const();
     }
 
-    void replace_if_overflow() {
-        for (auto& ele : data) {
-            ele.column = 
std::move(*ele.column).mutate()->convert_column_if_overflow();
-        }
-    }
-
     ColumnWithTypeAndName& safe_get_by_position(size_t position);
     const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;
 
diff --git a/be/test/vec/columns/column_object_test.cpp 
b/be/test/vec/columns/column_object_test.cpp
index d9c1b7da2ac..f7810819ff3 100644
--- a/be/test/vec/columns/column_object_test.cpp
+++ b/be/test/vec/columns/column_object_test.cpp
@@ -127,4 +127,131 @@ TEST_F(ColumnObjectTest, test_pop_back_multiple_types) {
     EXPECT_EQ(subcolumn.get_least_common_type()->get_name(), "Nothing");
 }
 
+TEST_F(ColumnObjectTest, test_insert_indices_from) {
+    // Test case 1: Insert from scalar variant source to empty destination
+    {
+        // Create source column with scalar values
+        auto src_column = ColumnObject::create(true);
+        Field field_int(123);
+        src_column->try_insert(field_int);
+        Field field_int2(456);
+        src_column->try_insert(field_int2);
+        src_column->finalize();
+        EXPECT_TRUE(src_column->is_scalar_variant());
+        EXPECT_TRUE(src_column->is_finalized());
+        EXPECT_EQ(src_column->size(), 2);
+
+        // Create empty destination column
+        auto dst_column = ColumnObject::create(true);
+        EXPECT_EQ(dst_column->size(), 0);
+
+        // Create indices
+        std::vector<uint32_t> indices = {0, 1};
+
+        // Insert using indices
+        dst_column->insert_indices_from(*src_column, indices.data(),
+                                        indices.data() + indices.size());
+
+        // Verify results
+        EXPECT_EQ(dst_column->size(), 2);
+        EXPECT_TRUE(dst_column->is_scalar_variant());
+        EXPECT_TRUE(dst_column->is_finalized());
+        EXPECT_EQ(dst_column->get_root_type()->get_name(), 
src_column->get_root_type()->get_name());
+
+        Field result1;
+        dst_column->get(0, result1);
+        EXPECT_EQ(result1.get<VariantMap>().at("").get<Int64>(), 123);
+
+        Field result2;
+        dst_column->get(1, result2);
+        EXPECT_EQ(result2.get<VariantMap>().at("").get<Int64>(), 456);
+    }
+
+    // Test case 2: Insert from scalar variant source to non-empty destination 
of same type
+    {
+        // Create source column with scalar values
+        auto src_column = ColumnObject::create(true);
+        Field field_int(123);
+        src_column->try_insert(field_int);
+        Field field_int2(456);
+        src_column->try_insert(field_int2);
+        src_column->finalize();
+        EXPECT_TRUE(src_column->is_scalar_variant());
+
+        // Create destination column with same type
+        auto dst_column = ColumnObject::create(true);
+        Field field_int3(789);
+        dst_column->try_insert(field_int3);
+        dst_column->finalize();
+        EXPECT_TRUE(dst_column->is_scalar_variant());
+        EXPECT_EQ(dst_column->size(), 1);
+
+        // Create indices for selecting specific elements
+        std::vector<uint32_t> indices = {1, 0};
+
+        // Insert using indices (reversed order)
+        dst_column->insert_indices_from(*src_column, indices.data(),
+                                        indices.data() + indices.size());
+
+        // Verify results
+        EXPECT_EQ(dst_column->size(), 3);
+
+        Field result1, result2, result3;
+        dst_column->get(0, result1);
+        dst_column->get(1, result2);
+        dst_column->get(2, result3);
+
+        EXPECT_EQ(result1.get<VariantMap>().at("").get<Int64>(), 789);
+        EXPECT_EQ(result2.get<VariantMap>().at("").get<Int64>(), 456);
+        EXPECT_EQ(result3.get<VariantMap>().at("").get<Int64>(), 123);
+    }
+
+    // Test case 3: Insert from non-scalar or different type source (fallback 
to try_insert)
+    {
+        // Create source column with object values (non-scalar)
+        auto src_column = ColumnObject::create(true);
+
+        // Create a map with {"a": 123}
+        Field field_map = VariantMap();
+        auto& map1 = field_map.get<VariantMap&>();
+        map1["a"] = 123;
+        src_column->try_insert(field_map);
+
+        // Create another map with {"b": "hello"}
+        field_map = VariantMap();
+        auto& map2 = field_map.get<VariantMap&>();
+        map2["b"] = String("hello");
+        src_column->try_insert(field_map);
+
+        src_column->finalize();
+        EXPECT_FALSE(src_column->is_scalar_variant());
+
+        // Create destination column (empty)
+        auto dst_column = ColumnObject::create(true);
+
+        // Create indices
+        std::vector<uint32_t> indices = {1, 0};
+
+        // Insert using indices
+        dst_column->insert_indices_from(*src_column, indices.data(),
+                                        indices.data() + indices.size());
+
+        // Verify results
+        EXPECT_EQ(dst_column->size(), 2);
+
+        Field result1, result2;
+        dst_column->get(0, result1);
+        dst_column->get(1, result2);
+
+        EXPECT_TRUE(result1.get_type() == Field::Types::VariantMap);
+        EXPECT_TRUE(result2.get_type() == Field::Types::VariantMap);
+
+        const auto& result1_map = result1.get<const VariantMap&>();
+        const auto& result2_map = result2.get<const VariantMap&>();
+
+        EXPECT_EQ(result1_map.at("b").get<const String&>(), "hello");
+        EXPECT_EQ(result2_map.at("a").get<Int64>(), 123);
+    }
+}
+
 } // namespace doris::vectorized


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to