This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 358bd79 [improvement](vec)(Join) Mem reuse to speed up join operator
(#7905)
358bd79 is described below
commit 358bd79fb10e571a9fcf54a8726ad9906cdec0e3
Author: HappenLee <[email protected]>
AuthorDate: Mon Jan 31 22:14:12 2022 +0800
[improvement](vec)(Join) Mem reuse to speed up join operator (#7905)
1. Reuse the mem of output block in vec join node
2. Add the function `replicate` in column
---
be/src/vec/columns/column.h | 4 ++++
be/src/vec/columns/column_complex.h | 21 ++++++++++++++++++
be/src/vec/columns/column_const.cpp | 6 ++++++
be/src/vec/columns/column_const.h | 1 +
be/src/vec/columns/column_decimal.cpp | 14 ++++++++++++
be/src/vec/columns/column_decimal.h | 3 +++
be/src/vec/columns/column_nullable.cpp | 6 ++++++
be/src/vec/columns/column_nullable.h | 1 +
be/src/vec/columns/column_string.cpp | 31 ++++++++++++++++++++++++++
be/src/vec/columns/column_string.h | 2 ++
be/src/vec/columns/column_vector.cpp | 14 ++++++++++++
be/src/vec/columns/column_vector.h | 2 ++
be/src/vec/common/hash_table/hash_table.h | 2 +-
be/src/vec/common/pod_array.h | 4 ++--
be/src/vec/exec/join/vhash_join_node.cpp | 36 +++++++++++++++----------------
be/src/vec/functions/math.cpp | 6 +++---
16 files changed, 129 insertions(+), 24 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index c730168..3edd25a 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -267,6 +267,10 @@ public:
using Offsets = PaddedPODArray<Offset>;
virtual Ptr replicate(const Offsets& offsets) const = 0;
+ virtual void replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
+ LOG(FATAL) << "not support";
+ };
+
/** Split column to smaller columns. Each value goes to column index,
selected by corresponding element of 'selector'.
* Selector must contain values from 0 to num_columns - 1.
* For default implementation, see scatter_impl.
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index 76ad270..576388f 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -191,6 +191,8 @@ public:
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const
override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
+
[[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
const IColumn::Selector& selector)
const override {
LOG(FATAL) << "scatter not implemented";
@@ -298,5 +300,24 @@ ColumnPtr ColumnComplexType<T>::replicate(const
IColumn::Offsets& offsets) const
return res;
}
+template <typename T>
+void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t
target_size, IColumn& column) const {
+ size_t size = data.size();
+ if (0 == size) return;
+
+ auto& res = reinterpret_cast<ColumnComplexType<T>&>(column);
+ typename Self::Container& res_data = res.get_data();
+ res_data.reserve(target_size);
+
+ for (size_t i = 0; i < size; ++i) {
+ size_t size_to_replicate = counts[i];
+ for (size_t j = 0; j < size_to_replicate; ++j) {
+ res_data.push_back(data[i]);
+ }
+ }
+
+ return res;
+}
+
using ColumnBitmap = ColumnComplexType<BitmapValue>;
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_const.cpp
b/be/src/vec/columns/column_const.cpp
index 3bc07a7..efb7902 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -65,6 +65,12 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets)
const {
return ColumnConst::create(data, replicated_size);
}
+void ColumnConst::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
+ if (s == 0) return;
+ auto& res = reinterpret_cast<ColumnConst&>(column);
+ res.s = s;
+}
+
ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const {
if (limit == 0) {
limit = s;
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index e019c56..16be166 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -117,6 +117,7 @@ public:
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const
override;
ColumnPtr replicate(const Offsets& offsets) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
// ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index fbfc1b4..2cf5cc6 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -222,6 +222,20 @@ ColumnPtr ColumnDecimal<T>::replicate(const
IColumn::Offsets& offsets) const {
}
template <typename T>
+void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
+ size_t size = data.size();
+ if (0 == size) return;
+
+ auto& res = reinterpret_cast<ColumnDecimal<T>&>(column);
+ typename Self::Container& res_data = res.get_data();
+ res_data.reserve(target_size);
+
+ for (size_t i = 0; i < size; ++i) {
+ res_data.add_num_element_without_reserve(data[i], counts[i]);
+ }
+}
+
+template <typename T>
void ColumnDecimal<T>::get_extremes(Field& min, Field& max) const {
if (data.size() == 0) {
min = NearestFieldType<T>(0, scale);
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index 96a7a72..017d891 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -153,6 +153,9 @@ public:
ColumnPtr index_impl(const PaddedPODArray<Type>& indexes, size_t limit)
const;
ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
+
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
+
void get_extremes(Field& min, Field& max) const override;
MutableColumns scatter(IColumn::ColumnIndex num_columns,
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index ae3a2fd..2d946f8 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -407,6 +407,12 @@ ColumnPtr ColumnNullable::replicate(const Offsets&
offsets) const {
return ColumnNullable::create(replicated_data, replicated_null_map);
}
+void ColumnNullable::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
+ auto& res = reinterpret_cast<ColumnNullable&>(column);
+ get_nested_column().replicate(counts, target_size,
res.get_nested_column());
+ get_null_map_column().replicate(counts, target_size,
res.get_null_map_column());
+}
+
template <bool negative>
void ColumnNullable::apply_null_map_impl(const ColumnUInt8& map) {
NullMap& arr1 = get_null_map_data();
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index a369c02..2bbf558 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -121,6 +121,7 @@ public:
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
void get_extremes(Field& min, Field& max) const override;
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index f4a32dc..afd3f23 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -290,6 +290,37 @@ ColumnPtr ColumnString::replicate(const Offsets&
replicate_offsets) const {
return res;
}
+void ColumnString::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
+ size_t col_size = size();
+ if (0 == col_size) return;
+
+ auto& res = reinterpret_cast<ColumnString&>(column);
+
+ Chars& res_chars = res.chars;
+ Offsets& res_offsets = res.offsets;
+ res_chars.reserve(chars.size() / col_size * target_size);
+ res_offsets.reserve(target_size);
+
+ Offset prev_string_offset = 0;
+ Offset current_new_offset = 0;
+
+ for (size_t i = 0; i < col_size; ++i) {
+ size_t size_to_replicate = counts[i];
+ size_t string_size = offsets[i] - prev_string_offset;
+
+ for (size_t j = 0; j < size_to_replicate; ++j) {
+ current_new_offset += string_size;
+ res_offsets.push_back(current_new_offset);
+
+ res_chars.resize(res_chars.size() + string_size);
+
memcpy_small_allow_read_write_overflow15(&res_chars[res_chars.size() -
string_size],
+
&chars[prev_string_offset], string_size);
+ }
+
+ prev_string_offset = offsets[i];
+ }
+}
+
void ColumnString::reserve(size_t n) {
offsets.reserve(n);
}
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 312e1d8..2b318fa 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -238,6 +238,8 @@ public:
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
+
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override {
return scatter_impl<ColumnString>(num_columns, selector);
}
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index f640a60..aed2502 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -325,6 +325,20 @@ ColumnPtr ColumnVector<T>::replicate(const
IColumn::Offsets& offsets) const {
}
template <typename T>
+void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
+ size_t size = data.size();
+ if (size == 0) return;
+
+ auto& res = reinterpret_cast<ColumnVector<T>&>(column);
+ typename Self::Container& res_data = res.get_data();
+ res_data.reserve(target_size);
+
+ for (size_t i = 0; i < size; ++i) {
+ res_data.add_num_element_without_reserve(data[i], counts[i]);
+ }
+}
+
+template <typename T>
void ColumnVector<T>::get_extremes(Field& min, Field& max) const {
size_t size = data.size();
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 8732032..8e06169 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -271,6 +271,8 @@ public:
ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
+
void get_extremes(Field& min, Field& max) const override;
MutableColumns scatter(IColumn::ColumnIndex num_columns,
diff --git a/be/src/vec/common/hash_table/hash_table.h
b/be/src/vec/common/hash_table/hash_table.h
index 53af823..f0a94b7 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -240,7 +240,7 @@ void insert_set_mapped(MappedType* dest, const ValueType&
src) {
/** Determines the size of the hash table, and when and how much it should be
resized.
*/
-template <size_t initial_size_degree = 8>
+template <size_t initial_size_degree = 10>
struct HashTableGrower {
/// The state of this structure is enough to get the buffer size of the
hash table.
doris::vectorized::UInt8 size_degree = initial_size_degree;
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 892b3d2..d6419fd 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -378,8 +378,8 @@ public:
template <typename U, typename... TAllocatorParams>
void add_num_element_without_reserve(U&& x, uint32_t num,
TAllocatorParams&&... allocator_params) {
- std::fill(t_end(), t_end() + num, x);
- this->c_end += sizeof(T) * num;
+ std::fill(t_end(), t_end() + num, x);
+ this->c_end += sizeof(T) * num;
}
/**
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9563ebf..4fa5429 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -175,9 +175,8 @@ struct ProcessHashTableProbe {
KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz,
nullptr);
- IColumn::Offsets offset_data;
+ std::vector<uint32_t> items_counts(_probe_rows);
auto& mcol = mutable_block.mutable_columns();
- offset_data.assign(_probe_rows, (uint32_t)0);
int right_col_idx = _join_node->_is_right_semi_anti ? 0 :
_left_table_data_types.size();
int right_col_len = _right_table_data_types.size();
@@ -187,10 +186,12 @@ struct ProcessHashTableProbe {
// ignore null rows
if constexpr (ignore_null) {
if ((*null_map)[_probe_index]) {
- offset_data[_probe_index++] = current_offset;
+ items_counts[_probe_index++] = 0;
continue;
}
}
+
+ int repeat_count = 0;
auto find_result =
(*null_map)[_probe_index]
?
decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index,
@@ -203,7 +204,7 @@ struct ProcessHashTableProbe {
if (find_result.is_found()) {
// left semi join only need one match, do not need insert the
data of right table
if (_join_node->_join_op == TJoinOp::LEFT_SEMI_JOIN) {
- ++current_offset;
+ ++repeat_count;
} else if (_join_node->_join_op == TJoinOp::LEFT_ANTI_JOIN) {
// do nothing
} else {
@@ -215,7 +216,7 @@ struct ProcessHashTableProbe {
// right semi/anti join should dispose the data in
hash table
// after probe data eof
if (!_join_node->_is_right_semi_anti) {
- ++current_offset;
+ ++repeat_count;
for (size_t j = 0; j < right_col_len; ++j) {
auto& column =
*mapped.block->get_by_position(j).column;
mcol[j + right_col_idx]->insert_from(column,
mapped.row_num);
@@ -226,7 +227,7 @@ struct ProcessHashTableProbe {
// right semi/anti join should dispose the data in
hash table
// after probe data eof
if (!_join_node->_is_right_semi_anti) {
- ++current_offset;
+ ++repeat_count;
for (size_t j = 0; j < right_col_len; ++j) {
auto& column =
*it->block->get_by_position(j).column;
// TODO: interface insert from cause
serious performance problems
@@ -240,7 +241,7 @@ struct ProcessHashTableProbe {
}
} else if (_join_node->_match_all_probe ||
_join_node->_join_op == TJoinOp::LEFT_ANTI_JOIN) {
- ++current_offset;
+ ++repeat_count;
// only full outer / left outer need insert the data of right
table
if (_join_node->_match_all_probe) {
for (size_t j = 0; j < right_col_len; ++j) {
@@ -250,22 +251,19 @@ struct ProcessHashTableProbe {
}
}
- offset_data[_probe_index++] = current_offset;
+ items_counts[_probe_index++] = repeat_count;
+ current_offset += repeat_count;
if (current_offset >= _batch_size) {
break;
}
}
-
- for (int i = _probe_index; i < _probe_rows; ++i) {
- offset_data[i] = current_offset;
- }
- output_block->swap(mutable_block.to_block());
-
+
for (int i = 0; i < right_col_idx; ++i) {
auto& column = _probe_block.get_by_position(i).column;
- output_block->get_by_position(i).column =
column->replicate(offset_data);
+ column->replicate(items_counts.data(), current_offset, *mcol[i]);
}
+ output_block->swap(mutable_block.to_block());
return Status::OK();
}
@@ -711,11 +709,13 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
}
Status st;
- output_block->clear();
if (_probe_index < _probe_block.rows()) {
- MutableBlock
mutable_block(VectorizedUtils::create_empty_columnswithtypename(
- _have_other_join_conjunct ? _row_desc_for_other_join_conjunt :
row_desc()));
+ MutableBlock mutable_block = (output_block->mem_reuse() &&
!_have_other_join_conjunct) ?
+ MutableBlock(output_block) :
+ MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
+ !_have_other_join_conjunct ? row_desc() :
_row_desc_for_other_join_conjunt));
+
std::visit(
[&](auto&& arg) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
diff --git a/be/src/vec/functions/math.cpp b/be/src/vec/functions/math.cpp
index abff782..540c2c1 100644
--- a/be/src/vec/functions/math.cpp
+++ b/be/src/vec/functions/math.cpp
@@ -181,7 +181,7 @@ struct HexIntImpl {
// uint64_t max value 0xFFFFFFFFFFFFFFFF , 16 'F'
if (num == 0) { return {hex_table, 1};}
- size_t i = 0;
+ int i = 0;
while (num) {
ans[i++] = hex_table[num & 15];
num = num >> 4;
@@ -189,13 +189,13 @@ struct HexIntImpl {
ans[i] = '\0';
// reverse
- for (int k = 0, j = i - 1; k <= j; k++, j--) {
+ for (int k = 0, j = i - 1; k <= j && k <= 16; k++, j--) {
char tmp = ans[j];
ans[j] = ans[k];
ans[k] = tmp;
}
- return {ans, i};
+ return {ans, static_cast<size_t>(i)};
}
static Status vector(const ColumnInt64::Container& data,
ColumnString::Chars& res_data,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]