This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b5da3f74f5 [improvement](join) avoid unnecessary copying in
_build_output_block (#21360)
b5da3f74f5 is described below
commit b5da3f74f507752fd6327e7e9af14f7e8052bfcb
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Jul 4 12:13:49 2023 +0800
[improvement](join) avoid unnecessary copying in _build_output_block
(#21360)
If the source columns are mutually exclusive within a temporary block,
there is no need to duplicate the data.
---
be/src/vec/exec/join/vhash_join_node.cpp | 7 +++-
be/src/vec/exec/join/vjoin_node_base.cpp | 44 ++++++++++++++++++-------
be/src/vec/exec/join/vjoin_node_base.h | 2 +-
be/src/vec/exec/join/vnested_loop_join_node.cpp | 6 +++-
4 files changed, 44 insertions(+), 15 deletions(-)
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 678d2f6483..074e210bf5 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -623,7 +623,12 @@ Status HashJoinNode::pull(doris::RuntimeState* state,
vectorized::Block* output_
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(VExprContext::filter_block(_conjuncts, &temp_block,
temp_block.columns()));
}
- RETURN_IF_ERROR(_build_output_block(&temp_block, output_block));
+
+ // Here make _join_block release the columns' ptr
+ _join_block.set_columns(_join_block.clone_empty_columns());
+ mutable_join_block.clear();
+
+ RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
_reset_tuple_is_null_column();
reached_limit(output_block, eos);
return Status::OK();
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 57870a0ac8..a4e1493d58 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -33,6 +33,7 @@
#include "util/telemetry/telemetry.h"
#include "util/threadpool.h"
#include "vec/columns/column.h"
+#include "vec/columns/column_array.h"
#include "vec/columns/column_nullable.h"
#include "vec/columns/column_vector.h"
#include "vec/columns/columns_number.h"
@@ -149,7 +150,8 @@ void VJoinNodeBase::_construct_mutable_join_block() {
}
}
-Status VJoinNodeBase::_build_output_block(Block* origin_block, Block*
output_block) {
+Status VJoinNodeBase::_build_output_block(Block* origin_block, Block*
output_block,
+ bool keep_origin) {
SCOPED_TIMER(_build_output_block_timer);
auto is_mem_reuse = output_block->mem_reuse();
MutableBlock mutable_block =
@@ -160,13 +162,21 @@ Status VJoinNodeBase::_build_output_block(Block*
origin_block, Block* output_blo
// TODO: After FE plan support same nullable of output expr and origin
block and mutable column
// we should replace `insert_column_datas` by `insert_range_from`
- auto insert_column_datas = [](auto& to, const auto& from, size_t rows) {
- if (to->is_nullable() && !from.is_nullable()) {
- auto& null_column = reinterpret_cast<ColumnNullable&>(*to);
- null_column.get_nested_column().insert_range_from(from, 0, rows);
- null_column.get_null_map_column().get_data().resize_fill(rows, 0);
+ auto insert_column_datas = [keep_origin](auto& to, ColumnPtr& from, size_t
rows) {
+ if (to->is_nullable() && !from->is_nullable()) {
+ if (keep_origin || !from->is_exclusive()) {
+ auto& null_column = reinterpret_cast<ColumnNullable&>(*to);
+ null_column.get_nested_column().insert_range_from(*from, 0,
rows);
+ null_column.get_null_map_column().get_data().resize_fill(rows,
0);
+ } else {
+ to = make_nullable(from, false)->assume_mutable();
+ }
} else {
- to->insert_range_from(from, 0, rows);
+ if (keep_origin || !from->is_exclusive()) {
+ to->insert_range_from(*from, 0, rows);
+ } else {
+ to = from->assume_mutable();
+ }
}
};
if (rows != 0) {
@@ -174,7 +184,7 @@ Status VJoinNodeBase::_build_output_block(Block*
origin_block, Block* output_blo
if (_output_expr_ctxs.empty()) {
DCHECK(mutable_columns.size() ==
row_desc().num_materialized_slots());
for (int i = 0; i < mutable_columns.size(); ++i) {
- insert_column_datas(mutable_columns[i],
*origin_block->get_by_position(i).column,
+ insert_column_datas(mutable_columns[i],
origin_block->get_by_position(i).column,
rows);
}
} else {
@@ -183,13 +193,23 @@ Status VJoinNodeBase::_build_output_block(Block*
origin_block, Block* output_blo
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(_output_expr_ctxs[i]->execute(origin_block,
&result_column_id));
- auto column_ptr =
origin_block->get_by_position(result_column_id)
-
.column->convert_to_full_column_if_const();
- insert_column_datas(mutable_columns[i], *column_ptr, rows);
+ auto& origin_column =
origin_block->get_by_position(result_column_id).column;
+
+ /// `convert_to_full_column_if_const` will create a pointer to
the origin column if
+ /// the origin column is not ColumnConst/ColumnArray, this
make the column be not
+ /// exclusive.
+ /// TODO: maybe need a method to check if a column need to be
converted to full
+ /// column.
+ if (is_column_const(*origin_column) ||
check_column<ColumnArray>(origin_column)) {
+ auto column_ptr =
origin_column->convert_to_full_column_if_const();
+ insert_column_datas(mutable_columns[i], column_ptr, rows);
+ } else {
+ insert_column_datas(mutable_columns[i], origin_column,
rows);
+ }
}
}
- if (!is_mem_reuse) {
+ if (!is_mem_reuse || !keep_origin) {
output_block->swap(mutable_block.to_block());
}
DCHECK(output_block->rows() == rows);
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index f29897719d..120e77785e 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -80,7 +80,7 @@ protected:
// Convert the intermediate blocks to the final result. For example, if
the block from probe
// side is non-nullable and the join op is righter outer join, we need to
convert the non-nullable
// columns from probe side to a nullable column.
- Status _build_output_block(Block* origin_block, Block* output_block);
+ Status _build_output_block(Block* origin_block, Block* output_block, bool
keep_origin = true);
// Open probe side asynchronously.
void _probe_side_open_thread(RuntimeState* state, std::promise<Status>*
status);
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 0b50860874..a3b6c30c9f 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -672,13 +672,17 @@ Status VNestedLoopJoinNode::pull(RuntimeState* state,
vectorized::Block* block,
{
Block tmp_block = _join_block;
+
+ // Here make _join_block release the columns' ptr
+ _join_block.set_columns(_join_block.clone_empty_columns());
+
_add_tuple_is_null_column(&tmp_block);
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
VExprContext::filter_block(_conjuncts, &tmp_block,
tmp_block.columns()));
}
- RETURN_IF_ERROR(_build_output_block(&tmp_block, block));
+ RETURN_IF_ERROR(_build_output_block(&tmp_block, block, false));
_reset_tuple_is_null_column();
}
_join_block.clear_column_data();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]