This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b35cfc5d5e [opt](join) Opt the performance of join probe (#21845)
b35cfc5d5e is described below
commit b35cfc5d5ea16f86a47e0b897bc7c9518d6290a4
Author: HappenLee <[email protected]>
AuthorDate: Wed Jul 19 01:21:22 2023 +0800
[opt](join) Opt the performance of join probe (#21845)
---
be/src/vec/columns/column.h | 3 +-
be/src/vec/columns/column_array.cpp | 33 +++++++-------
be/src/vec/columns/column_array.h | 3 +-
be/src/vec/columns/column_complex.h | 20 +++------
be/src/vec/columns/column_const.cpp | 5 +--
be/src/vec/columns/column_const.h | 3 +-
be/src/vec/columns/column_decimal.cpp | 14 +++---
be/src/vec/columns/column_decimal.h | 3 +-
be/src/vec/columns/column_nullable.cpp | 8 ++--
be/src/vec/columns/column_nullable.h | 3 +-
be/src/vec/columns/column_string.cpp | 46 ++++++++-----------
be/src/vec/columns/column_string.h | 3 +-
be/src/vec/columns/column_struct.cpp | 10 +----
be/src/vec/columns/column_struct.h | 3 +-
be/src/vec/columns/column_vector.cpp | 20 ++++-----
be/src/vec/columns/column_vector.h | 3 +-
be/src/vec/exec/join/process_hash_table_probe.h | 2 +-
.../vec/exec/join/process_hash_table_probe_impl.h | 52 ++++++++++++++--------
.../array/function_array_with_constant.cpp | 2 +-
be/test/vec/core/column_array_test.cpp | 8 ++--
20 files changed, 109 insertions(+), 135 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 0ff14a1653..84d035946d 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -488,8 +488,7 @@ public:
* If `begin` and `count_sz` specified, it means elements in range
[`begin`, `begin` + `count_sz`) will be replicated.
* If `count_sz` is -1, `begin` must be 0.
*/
- virtual void replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin = 0, int count_sz = -1) const {
+ virtual void replicate(const uint32_t* indexs, size_t target_size,
IColumn& column) const {
LOG(FATAL) << "not support";
}
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index b575f7bf15..94ec98c104 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -864,32 +864,29 @@ ColumnPtr ColumnArray::replicate(const IColumn::Offsets&
replicate_offsets) cons
return replicate_generic(replicate_offsets);
}
-void ColumnArray::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
- size_t col_size = count_sz < 0 ? size() : count_sz;
- if (col_size == 0) {
+void ColumnArray::replicate(const uint32_t* indexs, size_t target_size,
IColumn& column) const {
+ if (target_size == 0) {
return;
}
+ auto total_size = get_offsets().size();
//
|---------------------|-------------------------|-------------------------|
// [0, begin) [begin, begin + count_sz) [begin + count_sz,
size())
// do not need to copy copy counts[n] times do not need to copy
- IColumn::Offsets replicate_offsets(get_offsets().size(), 0);
- size_t cur_offset = 0;
- size_t end = begin + col_size;
+ IColumn::Offsets replicate_offsets(total_size, 0);
// copy original data at offset n counts[n] times
- for (size_t i = begin; i < end; ++i) {
- cur_offset += counts[i];
- replicate_offsets[i] = cur_offset;
- }
- // ignored
- for (size_t i = end; i < size(); ++i) {
- replicate_offsets[i] = replicate_offsets[i - 1];
+ auto begin = 0, end = 0;
+ while (begin < target_size) {
+ while (end < target_size && indexs[begin] == indexs[end]) {
+ end++;
+ }
+ long index = indexs[begin];
+ replicate_offsets[index] = end - begin;
+ begin = end;
}
- if (cur_offset != target_size) {
- LOG(WARNING) << "ColumnArray replicate input target_size:" <<
target_size
- << " not equal SUM(counts):" << cur_offset;
- return;
+ // ignored
+ for (size_t i = 1; i < total_size; ++i) {
+ replicate_offsets[i] += replicate_offsets[i - 1];
}
auto rep_res = replicate(replicate_offsets);
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index 4fe1827e17..18bdc74bc1 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -177,8 +177,7 @@ public:
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const
override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
ColumnPtr convert_to_full_column_if_const() const override;
void get_extremes(Field& min, Field& max) const override {
LOG(FATAL) << "get_extremes not implemented";
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index 138f3d0fb3..211de96df6 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -271,8 +271,7 @@ public:
ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const
override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* indexs, size_t target_size, IColumn&
column) const override;
[[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
const IColumn::Selector& selector)
const override {
@@ -410,21 +409,14 @@ ColumnPtr ColumnComplexType<T>::replicate(const
IColumn::Offsets& offsets) const
}
template <typename T>
-void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t
target_size, IColumn& column,
- size_t begin, int count_sz) const {
- size_t size = count_sz < 0 ? data.size() : count_sz;
- if (0 == size) return;
-
+void ColumnComplexType<T>::replicate(const uint32_t* indexs, size_t
target_size,
+ IColumn& column) const {
auto& res = reinterpret_cast<ColumnComplexType<T>&>(column);
typename Self::Container& res_data = res.get_data();
- res_data.reserve(target_size);
+ res_data.resize(target_size);
- size_t end = size + begin;
- for (size_t i = begin; i < end; ++i) {
- size_t size_to_replicate = counts[i];
- for (size_t j = 0; j < size_to_replicate; ++j) {
- res_data.push_back(data[i]);
- }
+ for (size_t i = 0; i < target_size; ++i) {
+ res_data[i] = data[indexs[i]];
}
}
diff --git a/be/src/vec/columns/column_const.cpp
b/be/src/vec/columns/column_const.cpp
index 13124dbd23..d8dbae40f4 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -78,11 +78,10 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets)
const {
return ColumnConst::create(data, replicated_size);
}
-void ColumnConst::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
+void ColumnConst::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
if (s == 0) return;
auto& res = reinterpret_cast<ColumnConst&>(column);
- res.s = s;
+ res.s = target_size;
}
ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const {
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 7554e773b9..3f33ee5792 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -191,8 +191,7 @@ public:
size_t filter(const Filter& filter) override;
ColumnPtr replicate(const Offsets& offsets) const override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* indexs, size_t target_size, IColumn&
column) const override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
// ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index a73be249eb..2c98e1193b 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -440,18 +440,14 @@ ColumnPtr ColumnDecimal<T>::replicate(const
IColumn::Offsets& offsets) const {
}
template <typename T>
-void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
- size_t size = count_sz < 0 ? data.size() : count_sz;
- if (0 == size) return;
-
+void ColumnDecimal<T>::replicate(const uint32_t* __restrict indexs, size_t
target_size,
+ IColumn& column) const {
auto& res = reinterpret_cast<ColumnDecimal<T>&>(column);
typename Self::Container& res_data = res.get_data();
- res_data.reserve(target_size);
+ res_data.resize(target_size);
- size_t end = size + begin;
- for (size_t i = begin; i < end; ++i) {
- res_data.add_num_element_without_reserve(data[i], counts[i]);
+ for (size_t i = 0; i < target_size; ++i) {
+ res_data[i] = data[indexs[i]];
}
}
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index 404c8d7019..aae9ab94d1 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -227,8 +227,7 @@ public:
ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* indexs, size_t target_size, IColumn&
column) const override;
TypeIndex get_data_type() const override { return TypeId<T>::value; }
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 538bcd27a6..bda5029c31 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -578,12 +578,10 @@ ColumnPtr ColumnNullable::replicate(const Offsets&
offsets) const {
return ColumnNullable::create(replicated_data, replicated_null_map);
}
-void ColumnNullable::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
+void ColumnNullable::replicate(const uint32_t* counts, size_t target_size,
IColumn& column) const {
auto& res = reinterpret_cast<ColumnNullable&>(column);
- get_nested_column().replicate(counts, target_size,
res.get_nested_column(), begin, count_sz);
- get_null_map_column().replicate(counts, target_size,
res.get_null_map_column(), begin,
- count_sz);
+ get_nested_column().replicate(counts, target_size,
res.get_nested_column());
+ get_null_map_column().replicate(counts, target_size,
res.get_null_map_column());
}
template <bool negative>
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 11c24be294..c3ee24600e 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -213,8 +213,7 @@ public:
size_t allocated_bytes() const override;
void protect() override;
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
void update_xxHash_with_value(size_t start, size_t end, uint64_t& hash,
const uint8_t* __restrict null_data) const
override;
void update_crc_with_value(size_t start, size_t end, uint64_t& hash,
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index 3277e62894..4ea25b7688 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -446,39 +446,29 @@ ColumnPtr ColumnString::replicate(const Offsets&
replicate_offsets) const {
return res;
}
-void ColumnString::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
- size_t col_size = count_sz < 0 ? size() : count_sz;
- if (0 == col_size) {
- return;
- }
-
+void ColumnString::replicate(const uint32_t* indexs, size_t target_size,
IColumn& column) const {
auto& res = reinterpret_cast<ColumnString&>(column);
Chars& res_chars = res.chars;
Offsets& res_offsets = res.offsets;
- res_chars.reserve(chars.size() / col_size * target_size);
- res_offsets.reserve(target_size);
-
- size_t base = begin > 0 ? offsets[begin - 1] : 0;
- Offset prev_string_offset = 0 + base;
- Offset current_new_offset = 0;
-
- size_t end = begin + col_size;
- for (size_t i = begin; i < end; ++i) {
- size_t size_to_replicate = counts[i];
- size_t string_size = offsets[i] - prev_string_offset;
-
- for (size_t j = 0; j < size_to_replicate; ++j) {
- current_new_offset += string_size;
- res_offsets.push_back(current_new_offset);
- res_chars.resize(res_chars.size() + string_size);
-
memcpy_small_allow_read_write_overflow15(&res_chars[res_chars.size() -
string_size],
-
&chars[prev_string_offset], string_size);
- }
-
- prev_string_offset = offsets[i];
+ size_t byte_size = 0;
+ res_offsets.resize(target_size);
+ for (size_t i = 0; i < target_size; ++i) {
+ long row_idx = indexs[i];
+ auto str_size = offsets[row_idx] - offsets[row_idx - 1];
+ res_offsets[i] = res_offsets[i - 1] + str_size;
+ byte_size += str_size;
+ }
+
+ res_chars.resize(byte_size);
+ auto* __restrict dest = res.chars.data();
+ auto* __restrict src = chars.data();
+ for (size_t i = 0; i < target_size; ++i) {
+ long row_idx = indexs[i];
+ auto str_size = offsets[row_idx] - offsets[row_idx - 1];
+ memcpy_small_allow_read_write_overflow15(dest + res_offsets[i - 1],
+ src + offsets[row_idx - 1],
str_size);
}
check_chars_length(res_chars.size(), res_offsets.size());
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index ac95f78037..14c426c762 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -506,8 +506,7 @@ public:
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* indexs, size_t target_size, IColumn&
column) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override {
return scatter_impl<ColumnString>(num_columns, selector);
diff --git a/be/src/vec/columns/column_struct.cpp
b/be/src/vec/columns/column_struct.cpp
index 0b3bcb24e8..78c438a5e1 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -294,18 +294,12 @@ ColumnPtr ColumnStruct::replicate(const Offsets& offsets)
const {
return ColumnStruct::create(new_columns);
}
-void ColumnStruct::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
- size_t col_size = count_sz < 0 ? size() : count_sz;
- if (0 == col_size) {
- return;
- }
-
+void ColumnStruct::replicate(const uint32_t* indexs, size_t target_size,
IColumn& column) const {
auto& res = reinterpret_cast<ColumnStruct&>(column);
res.columns.resize(columns.size());
for (size_t i = 0; i != columns.size(); ++i) {
- columns[i]->replicate(counts, target_size, *res.columns[i], begin,
count_sz);
+ columns[i]->replicate(indexs, target_size, *res.columns[i]);
}
}
diff --git a/be/src/vec/columns/column_struct.h
b/be/src/vec/columns/column_struct.h
index 9073725e81..700b5801c3 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -143,8 +143,7 @@ public:
Status filter_by_selector(const uint16_t* sel, size_t sel_size, IColumn*
col_ptr) override;
ColumnPtr permute(const Permutation& perm, size_t limit) const override;
ColumnPtr replicate(const Offsets& offsets) const override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override;
// ColumnPtr index(const IColumn & indexes, size_t limit) const override;
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index 388b436bc5..82555638d7 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -545,18 +545,18 @@ ColumnPtr ColumnVector<T>::replicate(const
IColumn::Offsets& offsets) const {
}
template <typename T>
-void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size,
IColumn& column,
- size_t begin, int count_sz) const {
- size_t size = count_sz < 0 ? data.size() : count_sz;
- if (size == 0) return;
-
+void ColumnVector<T>::replicate(const uint32_t* __restrict indexs, size_t
target_size,
+ IColumn& column) const {
auto& res = reinterpret_cast<ColumnVector<T>&>(column);
typename Self::Container& res_data = res.get_data();
- res_data.reserve(target_size);
-
- size_t end = begin + size;
- for (size_t i = begin; i < end; ++i) {
- res_data.add_num_element_without_reserve(data[i], counts[i]);
+ DCHECK(res_data.empty());
+ res_data.resize(target_size);
+ auto* __restrict left = res_data.data();
+ auto* __restrict right = data.data();
+ auto* __restrict idxs = indexs;
+
+ for (size_t i = 0; i < target_size; ++i) {
+ left[i] = right[idxs[i]];
}
}
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 48228822f3..b8c119a217 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -425,8 +425,7 @@ public:
ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
- void replicate(const uint32_t* counts, size_t target_size, IColumn&
column, size_t begin = 0,
- int count_sz = -1) const override;
+ void replicate(const uint32_t* indexs, size_t target_size, IColumn&
column) const override;
void get_extremes(Field& min, Field& max) const override;
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index db43daf20d..79e126584c 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -88,7 +88,7 @@ struct ProcessHashTableProbe {
std::unique_ptr<Arena> _arena;
std::vector<StringRef> _probe_keys;
- std::vector<uint32_t> _items_counts;
+ std::vector<uint32_t> _probe_indexs;
std::vector<int8_t> _build_block_offsets;
std::vector<int> _build_block_rows;
std::vector<std::pair<int8_t, int>> _build_blocks_locs;
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 4e9180bed9..d859632701 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -136,8 +136,8 @@ void
ProcessHashTableProbe<JoinOpType>::probe_side_output_column(
if (all_match_one) {
mcol[i]->insert_range_from(*column, last_probe_index,
probe_size);
} else {
- DCHECK_GE(_items_counts.size(), last_probe_index + probe_size);
- column->replicate(&_items_counts[0], size, *mcol[i],
last_probe_index, probe_size);
+ DCHECK_GE(_probe_indexs.size(), last_probe_index + probe_size);
+ column->replicate(&_probe_indexs[0], size, *mcol[i]);
}
} else {
mcol[i]->insert_many_defaults(size);
@@ -216,10 +216,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
bool is_mark_join) {
auto& probe_index = _join_node->_probe_index;
auto& probe_raw_ptrs = _join_node->_probe_columns;
- if (probe_index == 0 && _items_counts.size() < probe_rows) {
- _items_counts.resize(probe_rows);
- }
+ _probe_indexs.resize(_batch_size);
if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) {
_build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
_build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
@@ -264,13 +262,14 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if (LIKELY(current_offset < _build_block_rows.size())) {
_build_block_offsets[current_offset] =
probe_row_match_iter->block_offset;
_build_block_rows[current_offset] =
probe_row_match_iter->row_num;
+ _probe_indexs[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
+ _probe_indexs.template emplace_back(probe_index);
}
++current_offset;
}
- _items_counts[probe_index] = current_offset;
all_match_one &= (current_offset == 1);
if (!probe_row_match_iter.ok()) {
++probe_index;
@@ -284,19 +283,19 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
if constexpr (probe_all) {
- _items_counts[probe_index++] = (uint32_t)1;
// only full outer / left outer need insert the
data of right table
if (LIKELY(current_offset <
_build_block_rows.size())) {
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
+ _probe_indexs[current_offset] = probe_index;
} else {
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
+ _probe_indexs.template
emplace_back(probe_index);
}
++current_offset;
- } else {
- _items_counts[probe_index++] = (uint32_t)0;
}
+ probe_index++;
all_match_one = false;
if constexpr (probe_all) {
if (current_offset >= _batch_size) {
@@ -415,7 +414,16 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
}
uint32_t count = (uint32_t)(current_offset - last_offset);
- _items_counts[current_probe_index] = count;
+ if (LIKELY(current_offset < _probe_indexs.size())) {
+ for (int i = last_offset; i < current_offset; ++i) {
+ _probe_indexs[i] = current_probe_index;
+ }
+ } else {
+ for (int i = last_offset; i < _probe_indexs.size(); ++i) {
+ _probe_indexs[i] = current_probe_index;
+ }
+ _probe_indexs.resize(current_offset, current_probe_index);
+ }
all_match_one &= (count == 1);
if (current_offset >= _batch_size) {
break;
@@ -451,10 +459,8 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
Block* output_block, size_t probe_rows, bool is_mark_join) {
auto& probe_index = _join_node->_probe_index;
auto& probe_raw_ptrs = _join_node->_probe_columns;
- if (probe_index == 0 && _items_counts.size() < probe_rows) {
- _items_counts.resize(probe_rows);
- }
if (_build_block_rows.size() < probe_rows * PROBE_SIDE_EXPLODE_RATE) {
+ _probe_indexs.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
_build_block_rows.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
_build_block_offsets.resize(probe_rows * PROBE_SIDE_EXPLODE_RATE);
}
@@ -501,9 +507,11 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
for (; probe_row_match_iter.ok() && current_offset < _batch_size;
++probe_row_match_iter) {
if (LIKELY(current_offset < _build_block_rows.size())) {
+ _probe_indexs[current_offset] = probe_index;
_build_block_offsets[current_offset] =
probe_row_match_iter->block_offset;
_build_block_rows[current_offset] =
probe_row_match_iter->row_num;
} else {
+ _probe_indexs.template emplace_back(probe_index);
_build_block_offsets.emplace_back(probe_row_match_iter->block_offset);
_build_block_rows.emplace_back(probe_row_match_iter->row_num);
}
@@ -517,7 +525,6 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
row_count_from_last_probe = current_offset;
all_match_one &= (current_offset == 1);
- _items_counts[probe_index] = current_offset;
if (!probe_row_match_iter.ok()) {
++probe_index;
is_the_last_sub_block = true;
@@ -532,21 +539,21 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
if constexpr (ignore_null && need_null_map_for_probe) {
if ((*null_map)[probe_index]) {
if constexpr (probe_all) {
- _items_counts[probe_index++] = (uint32_t)1;
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the
data of right table
if (LIKELY(current_offset <
_build_block_rows.size())) {
+ _probe_indexs[current_offset] = probe_index;
_build_block_offsets[current_offset] = -1;
_build_block_rows[current_offset] = -1;
} else {
+ _probe_indexs.template
emplace_back(probe_index);
_build_block_offsets.emplace_back(-1);
_build_block_rows.emplace_back(-1);
}
++current_offset;
- } else {
- _items_counts[probe_index++] = (uint32_t)0;
}
+ probe_index++;
all_match_one = false;
if constexpr (probe_all) {
if (current_offset >= _batch_size) {
@@ -676,7 +683,16 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
++probe_index;
}
uint32_t count = (uint32_t)(current_offset - last_offset);
- _items_counts[current_probe_index] = count;
+ if (LIKELY(current_offset < _probe_indexs.size())) {
+ for (int i = last_offset; i < current_offset; ++i) {
+ _probe_indexs[i] = current_probe_index;
+ }
+ } else {
+ for (int i = last_offset; i < _probe_indexs.size(); ++i) {
+ _probe_indexs[i] = current_probe_index;
+ }
+ _probe_indexs.resize(current_offset, current_probe_index);
+ }
all_match_one &= (count == 1);
if (current_offset >= _batch_size) {
break;
diff --git a/be/src/vec/functions/array/function_array_with_constant.cpp
b/be/src/vec/functions/array/function_array_with_constant.cpp
index e1a0c4f15e..9e3885b1c3 100644
--- a/be/src/vec/functions/array/function_array_with_constant.cpp
+++ b/be/src/vec/functions/array/function_array_with_constant.cpp
@@ -98,7 +98,7 @@ public:
}
offset += array_size;
offsets.push_back(offset);
- array_sizes.push_back(array_size);
+ array_sizes.resize(array_sizes.size() + array_size, i);
}
auto clone = value->clone_empty();
clone->reserve(input_rows_count);
diff --git a/be/test/vec/core/column_array_test.cpp
b/be/test/vec/core/column_array_test.cpp
index a87bea1cb8..fd2ed21273 100644
--- a/be/test/vec/core/column_array_test.cpp
+++ b/be/test/vec/core/column_array_test.cpp
@@ -200,8 +200,8 @@ TEST(ColumnArrayTest, IntArrayReplicateTest) {
}
ColumnArray array_column(std::move(data_column), std::move(off_column));
- uint32_t counts[] = {2, 1, 0, 3}; // size should be equal
array_column.size()
- size_t target_size = 6; // sum(counts)
+ uint32_t counts[] = {0, 0, 1, 3, 3, 3}; // size should be equal
array_column.size()
+ size_t target_size = 6; // sum(counts)
// return array column: [[1,2,3],[1,2,3],[],[5,6],[5,6],[5,6]];
auto res1 = array_column.clone_empty();
@@ -224,8 +224,8 @@ TEST(ColumnArrayTest, StringArrayReplicateTest) {
}
ColumnArray array_column(std::move(data_column), std::move(off_column));
- uint32_t counts[] = {2, 1, 0, 3}; // size should be equal
array_column.size()
- size_t target_size = 6; // sum(counts)
+ uint32_t counts[] = {0, 0, 1, 3, 3, 3}; // size should be equal
array_column.size()
+ size_t target_size = 6; // sum(counts)
// return array column: [["abc","d"],["abc","d"],["ef"],[""],[""],[""]];
auto res1 = array_column.clone_empty();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]