This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 1a450a2d778 [Opt](exec) opt variant column in join op (#49049)
1a450a2d778 is described below
commit 1a450a2d77831aa3270916d86dd2d0491c7b3de3
Author: HappenLee <[email protected]>
AuthorDate: Fri Mar 14 16:50:34 2025 +0800
[Opt](exec) opt variant column in join op (#49049)
Refactor Column Object insert_indice_from to speed up join op
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 15 ++-
be/src/pipeline/exec/hashjoin_build_sink.h | 3 +
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 9 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 3 +
.../exec/join/process_hash_table_probe_impl.h | 4 +
be/src/vec/columns/column_const.h | 2 +
be/src/vec/columns/column_nullable.h | 2 +
be/src/vec/columns/column_object.cpp | 24 +++-
be/src/vec/core/block.h | 6 -
be/test/vec/columns/column_object_test.cpp | 127 +++++++++++++++++++++
10 files changed, 181 insertions(+), 14 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index e2b911a583b..61d6e0f30fb 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -410,11 +410,16 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
LOG(INFO) << "build block rows: " << block.rows() << ", columns count: "
<< block.columns()
<< ", bytes/allocated_bytes: " <<
PrettyPrinter::print_bytes(block.bytes()) << "/"
<< PrettyPrinter::print_bytes(block.allocated_bytes());
-
- block.replace_if_overflow();
+ // 1. Dispose the overflow of ColumnString
+ // 2. Finalize the ColumnObject to speed up
+ for (auto& data : block) {
+ data.column =
std::move(*data.column).mutate()->convert_column_if_overflow();
+ if (p._need_finalize_variant_column) {
+ std::move(*data.column).mutate()->finalize();
+ }
+ }
vectorized::ColumnRawPtrs raw_ptrs(_build_expr_ctxs.size());
-
vectorized::ColumnUInt8::MutablePtr null_map_val;
if (p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::FULL_OUTER_JOIN) {
_convert_block_to_null(block);
@@ -587,9 +592,11 @@ Status HashJoinBuildSinkOperatorX::prepare(RuntimeState*
state) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
output_slot_flags.emplace_back(
- _hash_output_slot_ids.empty() ||
std::find(_hash_output_slot_ids.begin(),
_hash_output_slot_ids.end(),
slot_desc->id()) !=
_hash_output_slot_ids.end());
+ if (output_slot_flags.back() &&
slot_desc->type().is_variant_type()) {
+ _need_finalize_variant_column = true;
+ }
}
}
};
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 81100bdbfae..cc5cdf99ab2 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -180,6 +180,9 @@ private:
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _should_keep_column_flags;
bool _should_keep_hash_key_column = false;
+ // if build side has variant column and need output variant column
+ // need to finalize variant column to speed up the join op
+ bool _need_finalize_variant_column = false;
};
template <class HashTableContext>
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index a55379a02ad..7f273680f6d 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -500,16 +500,21 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState*
state) {
// _left_output_slots_flags : column of left table need to output set flag
= true
// _rgiht_output_slots_flags : column of right table need to output set
flag = true
// if _hash_output_slot_ids is empty, means all column of left/right table
need to output.
- auto init_output_slots_flags = [&](auto& tuple_descs, auto&
output_slot_flags) {
+ auto init_output_slots_flags = [&](auto& tuple_descs, auto&
output_slot_flags,
+ bool init_finalize_flag = false) {
for (const auto& tuple_desc : tuple_descs) {
for (const auto& slot_desc : tuple_desc->slots()) {
output_slot_flags.emplace_back(
std::find(_hash_output_slot_ids.begin(),
_hash_output_slot_ids.end(),
slot_desc->id()) !=
_hash_output_slot_ids.end());
+ if (init_finalize_flag && output_slot_flags.back() &&
+ slot_desc->type().is_variant_type()) {
+ _need_finalize_variant_column = true;
+ }
}
}
};
- init_output_slots_flags(_child->row_desc().tuple_descriptors(),
_left_output_slot_flags);
+ init_output_slots_flags(_child->row_desc().tuple_descriptors(),
_left_output_slot_flags, true);
init_output_slots_flags(_build_side_child->row_desc().tuple_descriptors(),
_right_output_slot_flags);
// _other_join_conjuncts are evaluated in the context of the rows produced
by this node
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index b9774b5bb0d..f3f1983975e 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -159,6 +159,8 @@ public:
_join_distribution != TJoinDistributionType::NONE;
}
+ bool need_finalize_variant_column() const { return
_need_finalize_variant_column; }
+
private:
Status _do_evaluate(vectorized::Block& block,
vectorized::VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer,
@@ -181,6 +183,7 @@ private:
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
+ bool _need_finalize_variant_column = false;
std::vector<std::string> _right_table_column_names;
const std::vector<TExpr> _partition_exprs;
};
diff --git a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
index 11a70c5d5f7..b06e7c7c388 100644
--- a/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/pipeline/exec/join/process_hash_table_probe_impl.h
@@ -122,6 +122,10 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
auto& probe_block = _parent->_probe_block;
for (int i = 0; i < output_slot_flags.size(); ++i) {
if (output_slot_flags[i]) {
+ if (auto& p = _parent->parent()->cast<HashJoinProbeOperatorX>();
+ p.need_finalize_variant_column()) {
+
std::move(*probe_block.get_by_position(i).column).mutate()->finalize();
+ }
auto& column = probe_block.get_by_position(i).column;
if (all_match_one) {
mcol[i]->insert_range_from(*column, _probe_indexs[0], size);
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 8015e7e680c..dcb3c8111b3 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -274,6 +274,8 @@ public:
DCHECK(size() > self_row);
data->replace_column_data(rhs, row, self_row);
}
+
+ void finalize() override { data->finalize(); }
};
} // namespace doris::vectorized
#include "common/compile_check_end.h"
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 63ec3225bfd..58599a51f22 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -436,6 +436,8 @@ public:
return nested_column->get_rowset_segment_id();
}
+ void finalize() override { get_nested_column().finalize(); }
+
private:
void _update_has_null();
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index 6b5cd9db56d..dccacd6b813 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -1905,9 +1905,29 @@ Status ColumnObject::extract_root(const PathInData&
path, MutableColumnPtr& dst)
void ColumnObject::insert_indices_from(const IColumn& src, const uint32_t*
indices_begin,
const uint32_t* indices_end) {
- for (const auto* x = indices_begin; x != indices_end; ++x) {
- ColumnObject::insert_from(src, *x);
+ // optimize when src and this column are scalar variant, since try_insert
is inefficiency
+ const auto* src_v = check_and_get_column<ColumnObject>(src);
+
+ bool src_can_do_quick_insert =
+ src_v != nullptr && src_v->is_scalar_variant() &&
src_v->is_finalized();
+ // num_rows == 0 means this column is empty, we not need to check it type
+ if (num_rows == 0 && src_can_do_quick_insert) {
+ // add a new root column, and insert from src root column
+ clear();
+ add_sub_column({}, src_v->get_root()->clone_empty(),
src_v->get_root_type());
+
+ get_root()->insert_indices_from(*src_v->get_root(), indices_begin,
indices_end);
+ num_rows += indices_end - indices_begin;
+ } else if (src_can_do_quick_insert && is_scalar_variant() &&
+ src_v->get_root_type()->equals(*get_root_type())) {
+ get_root()->insert_indices_from(*src_v->get_root(), indices_begin,
indices_end);
+ num_rows += indices_end - indices_begin;
+ } else {
+ for (const auto* x = indices_begin; x != indices_end; ++x) {
+ try_insert(src[*x]);
+ }
}
+ finalize();
}
void ColumnObject::for_each_imutable_subcolumn(ImutableColumnCallback
callback) const {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 74a953350d5..59ad41d9ba7 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -149,12 +149,6 @@ public:
element.column = element.column->convert_to_full_column_if_const();
}
- void replace_if_overflow() {
- for (auto& ele : data) {
- ele.column =
std::move(*ele.column).mutate()->convert_column_if_overflow();
- }
- }
-
ColumnWithTypeAndName& safe_get_by_position(size_t position);
const ColumnWithTypeAndName& safe_get_by_position(size_t position) const;
diff --git a/be/test/vec/columns/column_object_test.cpp
b/be/test/vec/columns/column_object_test.cpp
index d9c1b7da2ac..f7810819ff3 100644
--- a/be/test/vec/columns/column_object_test.cpp
+++ b/be/test/vec/columns/column_object_test.cpp
@@ -127,4 +127,131 @@ TEST_F(ColumnObjectTest, test_pop_back_multiple_types) {
EXPECT_EQ(subcolumn.get_least_common_type()->get_name(), "Nothing");
}
+TEST_F(ColumnObjectTest, test_insert_indices_from) {
+ // Test case 1: Insert from scalar variant source to empty destination
+ {
+ // Create source column with scalar values
+ auto src_column = ColumnObject::create(true);
+ Field field_int(123);
+ src_column->try_insert(field_int);
+ Field field_int2(456);
+ src_column->try_insert(field_int2);
+ src_column->finalize();
+ EXPECT_TRUE(src_column->is_scalar_variant());
+ EXPECT_TRUE(src_column->is_finalized());
+ EXPECT_EQ(src_column->size(), 2);
+
+ // Create empty destination column
+ auto dst_column = ColumnObject::create(true);
+ EXPECT_EQ(dst_column->size(), 0);
+
+ // Create indices
+ std::vector<uint32_t> indices = {0, 1};
+
+ // Insert using indices
+ dst_column->insert_indices_from(*src_column, indices.data(),
+ indices.data() + indices.size());
+
+ // Verify results
+ EXPECT_EQ(dst_column->size(), 2);
+ EXPECT_TRUE(dst_column->is_scalar_variant());
+ EXPECT_TRUE(dst_column->is_finalized());
+ EXPECT_EQ(dst_column->get_root_type()->get_name(),
src_column->get_root_type()->get_name());
+
+ Field result1;
+ dst_column->get(0, result1);
+ EXPECT_EQ(result1.get<VariantMap>().at("").get<Int64>(), 123);
+
+ Field result2;
+ dst_column->get(1, result2);
+ EXPECT_EQ(result2.get<VariantMap>().at("").get<Int64>(), 456);
+ }
+
+ // Test case 2: Insert from scalar variant source to non-empty destination
of same type
+ {
+ // Create source column with scalar values
+ auto src_column = ColumnObject::create(true);
+ Field field_int(123);
+ src_column->try_insert(field_int);
+ Field field_int2(456);
+ src_column->try_insert(field_int2);
+ src_column->finalize();
+ EXPECT_TRUE(src_column->is_scalar_variant());
+
+ // Create destination column with same type
+ auto dst_column = ColumnObject::create(true);
+ Field field_int3(789);
+ dst_column->try_insert(field_int3);
+ dst_column->finalize();
+ EXPECT_TRUE(dst_column->is_scalar_variant());
+ EXPECT_EQ(dst_column->size(), 1);
+
+ // Create indices for selecting specific elements
+ std::vector<uint32_t> indices = {1, 0};
+
+ // Insert using indices (reversed order)
+ dst_column->insert_indices_from(*src_column, indices.data(),
+ indices.data() + indices.size());
+
+ // Verify results
+ EXPECT_EQ(dst_column->size(), 3);
+
+ Field result1, result2, result3;
+ dst_column->get(0, result1);
+ dst_column->get(1, result2);
+ dst_column->get(2, result3);
+
+ EXPECT_EQ(result1.get<VariantMap>().at("").get<Int64>(), 789);
+ EXPECT_EQ(result2.get<VariantMap>().at("").get<Int64>(), 456);
+ EXPECT_EQ(result3.get<VariantMap>().at("").get<Int64>(), 123);
+ }
+
+ // Test case 3: Insert from non-scalar or different type source (fallback
to try_insert)
+ {
+ // Create source column with object values (non-scalar)
+ auto src_column = ColumnObject::create(true);
+
+ // Create a map with {"a": 123}
+ Field field_map = VariantMap();
+ auto& map1 = field_map.get<VariantMap&>();
+ map1["a"] = 123;
+ src_column->try_insert(field_map);
+
+ // Create another map with {"b": "hello"}
+ field_map = VariantMap();
+ auto& map2 = field_map.get<VariantMap&>();
+ map2["b"] = String("hello");
+ src_column->try_insert(field_map);
+
+ src_column->finalize();
+ EXPECT_FALSE(src_column->is_scalar_variant());
+
+ // Create destination column (empty)
+ auto dst_column = ColumnObject::create(true);
+
+ // Create indices
+ std::vector<uint32_t> indices = {1, 0};
+
+ // Insert using indices
+ dst_column->insert_indices_from(*src_column, indices.data(),
+ indices.data() + indices.size());
+
+ // Verify results
+ EXPECT_EQ(dst_column->size(), 2);
+
+ Field result1, result2;
+ dst_column->get(0, result1);
+ dst_column->get(1, result2);
+
+ EXPECT_TRUE(result1.get_type() == Field::Types::VariantMap);
+ EXPECT_TRUE(result2.get_type() == Field::Types::VariantMap);
+
+ const auto& result1_map = result1.get<const VariantMap&>();
+ const auto& result2_map = result2.get<const VariantMap&>();
+
+ EXPECT_EQ(result1_map.at("b").get<const String&>(), "hello");
+ EXPECT_EQ(result2_map.at("a").get<Int64>(), 123);
+ }
+}
+
} // namespace doris::vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]