This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 358bd79  [improvement](vec)(Join) Mem reuse to speed up join operator 
(#7905)
358bd79 is described below

commit 358bd79fb10e571a9fcf54a8726ad9906cdec0e3
Author: HappenLee <[email protected]>
AuthorDate: Mon Jan 31 22:14:12 2022 +0800

    [improvement](vec)(Join) Mem reuse to speed up join operator (#7905)
    
    1. Reuse the mem of output block in vec join node
    2. Add the function `replicate` in column
---
 be/src/vec/columns/column.h               |  4 ++++
 be/src/vec/columns/column_complex.h       | 21 ++++++++++++++++++
 be/src/vec/columns/column_const.cpp       |  6 ++++++
 be/src/vec/columns/column_const.h         |  1 +
 be/src/vec/columns/column_decimal.cpp     | 14 ++++++++++++
 be/src/vec/columns/column_decimal.h       |  3 +++
 be/src/vec/columns/column_nullable.cpp    |  6 ++++++
 be/src/vec/columns/column_nullable.h      |  1 +
 be/src/vec/columns/column_string.cpp      | 31 ++++++++++++++++++++++++++
 be/src/vec/columns/column_string.h        |  2 ++
 be/src/vec/columns/column_vector.cpp      | 14 ++++++++++++
 be/src/vec/columns/column_vector.h        |  2 ++
 be/src/vec/common/hash_table/hash_table.h |  2 +-
 be/src/vec/common/pod_array.h             |  4 ++--
 be/src/vec/exec/join/vhash_join_node.cpp  | 36 +++++++++++++++----------------
 be/src/vec/functions/math.cpp             |  6 +++---
 16 files changed, 129 insertions(+), 24 deletions(-)

diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index c730168..3edd25a 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -267,6 +267,10 @@ public:
     using Offsets = PaddedPODArray<Offset>;
     virtual Ptr replicate(const Offsets& offsets) const = 0;
 
+    virtual void replicate(const uint32_t* counts, size_t target_size, 
IColumn& column) const {
+        LOG(FATAL) << "not support";
+    };
+
     /** Split column to smaller columns. Each value goes to column index, 
selected by corresponding element of 'selector'.
       * Selector must contain values from 0 to num_columns - 1.
       * For default implementation, see scatter_impl.
diff --git a/be/src/vec/columns/column_complex.h 
b/be/src/vec/columns/column_complex.h
index 76ad270..576388f 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -191,6 +191,8 @@ public:
 
     ColumnPtr replicate(const IColumn::Offsets& replicate_offsets) const 
override;
 
+    void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
+
     [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
                                         const IColumn::Selector& selector) 
const override {
         LOG(FATAL) << "scatter not implemented";
@@ -298,5 +300,24 @@ ColumnPtr ColumnComplexType<T>::replicate(const 
IColumn::Offsets& offsets) const
     return res;
 }
 
+template <typename T>
+void ColumnComplexType<T>::replicate(const uint32_t* counts, size_t 
target_size, IColumn& column) const {
+    size_t size = data.size();
+    if (0 == size) return;
+
+    auto& res = reinterpret_cast<ColumnComplexType<T>&>(column);
+    typename Self::Container& res_data = res.get_data();
+    res_data.reserve(target_size);
+
+    for (size_t i = 0; i < size; ++i) {
+        size_t size_to_replicate = counts[i];
+        for (size_t j = 0; j < size_to_replicate; ++j) {
+            res_data.push_back(data[i]);
+        }
+    }
+
+    return res;
+}
+
 using ColumnBitmap = ColumnComplexType<BitmapValue>;
 } // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_const.cpp 
b/be/src/vec/columns/column_const.cpp
index 3bc07a7..efb7902 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -65,6 +65,12 @@ ColumnPtr ColumnConst::replicate(const Offsets& offsets) 
const {
     return ColumnConst::create(data, replicated_size);
 }
 
+void ColumnConst::replicate(const uint32_t* counts, size_t target_size, 
IColumn& column) const {
+    if (s == 0) return;
+    auto& res = reinterpret_cast<ColumnConst&>(column);
+    res.s = s;
+}
+
 ColumnPtr ColumnConst::permute(const Permutation& perm, size_t limit) const {
     if (limit == 0) {
         limit = s;
diff --git a/be/src/vec/columns/column_const.h 
b/be/src/vec/columns/column_const.h
index e019c56..16be166 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -117,6 +117,7 @@ public:
 
     ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const 
override;
     ColumnPtr replicate(const Offsets& offsets) const override;
+    void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
     ColumnPtr permute(const Permutation& perm, size_t limit) const override;
     // ColumnPtr index(const IColumn & indexes, size_t limit) const override;
     void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_decimal.cpp 
b/be/src/vec/columns/column_decimal.cpp
index fbfc1b4..2cf5cc6 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -222,6 +222,20 @@ ColumnPtr ColumnDecimal<T>::replicate(const 
IColumn::Offsets& offsets) const {
 }
 
 template <typename T>
+void ColumnDecimal<T>::replicate(const uint32_t* counts, size_t target_size, 
IColumn& column) const {
+    size_t size = data.size();
+    if (0 == size) return;
+
+    auto& res = reinterpret_cast<ColumnDecimal<T>&>(column);
+    typename Self::Container& res_data = res.get_data();
+    res_data.reserve(target_size);
+
+    for (size_t i = 0; i < size; ++i) {
+        res_data.add_num_element_without_reserve(data[i], counts[i]);
+    }
+}
+
+template <typename T>
 void ColumnDecimal<T>::get_extremes(Field& min, Field& max) const {
     if (data.size() == 0) {
         min = NearestFieldType<T>(0, scale);
diff --git a/be/src/vec/columns/column_decimal.h 
b/be/src/vec/columns/column_decimal.h
index 96a7a72..017d891 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -153,6 +153,9 @@ public:
     ColumnPtr index_impl(const PaddedPODArray<Type>& indexes, size_t limit) 
const;
 
     ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
+
+    void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
+
     void get_extremes(Field& min, Field& max) const override;
 
     MutableColumns scatter(IColumn::ColumnIndex num_columns,
diff --git a/be/src/vec/columns/column_nullable.cpp 
b/be/src/vec/columns/column_nullable.cpp
index ae3a2fd..2d946f8 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -407,6 +407,12 @@ ColumnPtr ColumnNullable::replicate(const Offsets& 
offsets) const {
     return ColumnNullable::create(replicated_data, replicated_null_map);
 }
 
+void ColumnNullable::replicate(const uint32_t* counts, size_t target_size, 
IColumn& column) const {
+    auto& res = reinterpret_cast<ColumnNullable&>(column);
+    get_nested_column().replicate(counts, target_size, 
res.get_nested_column());
+    get_null_map_column().replicate(counts, target_size, 
res.get_null_map_column());
+}
+
 template <bool negative>
 void ColumnNullable::apply_null_map_impl(const ColumnUInt8& map) {
     NullMap& arr1 = get_null_map_data();
diff --git a/be/src/vec/columns/column_nullable.h 
b/be/src/vec/columns/column_nullable.h
index a369c02..2bbf558 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -121,6 +121,7 @@ public:
     size_t allocated_bytes() const override;
     void protect() override;
     ColumnPtr replicate(const Offsets& replicate_offsets) const override;
+    void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
     void update_hash_with_value(size_t n, SipHash& hash) const override;
     void get_extremes(Field& min, Field& max) const override;
 
diff --git a/be/src/vec/columns/column_string.cpp 
b/be/src/vec/columns/column_string.cpp
index f4a32dc..afd3f23 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -290,6 +290,37 @@ ColumnPtr ColumnString::replicate(const Offsets& 
replicate_offsets) const {
     return res;
 }
 
+void ColumnString::replicate(const uint32_t* counts, size_t target_size, 
IColumn& column) const {
+    size_t col_size = size();
+    if (0 == col_size) return;
+
+    auto& res = reinterpret_cast<ColumnString&>(column);
+
+    Chars& res_chars = res.chars;
+    Offsets& res_offsets = res.offsets;
+    res_chars.reserve(chars.size() / col_size * target_size);
+    res_offsets.reserve(target_size);
+
+    Offset prev_string_offset = 0;
+    Offset current_new_offset = 0;
+
+    for (size_t i = 0; i < col_size; ++i) {
+        size_t size_to_replicate = counts[i];
+        size_t string_size = offsets[i] - prev_string_offset;
+
+        for (size_t j = 0; j < size_to_replicate; ++j) {
+            current_new_offset += string_size;
+            res_offsets.push_back(current_new_offset);
+
+            res_chars.resize(res_chars.size() + string_size);
+            
memcpy_small_allow_read_write_overflow15(&res_chars[res_chars.size() - 
string_size],
+                                                     
&chars[prev_string_offset], string_size);
+        }
+
+        prev_string_offset = offsets[i];
+    }
+}
+
 void ColumnString::reserve(size_t n) {
     offsets.reserve(n);
 }
diff --git a/be/src/vec/columns/column_string.h 
b/be/src/vec/columns/column_string.h
index 312e1d8..2b318fa 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -238,6 +238,8 @@ public:
 
     ColumnPtr replicate(const Offsets& replicate_offsets) const override;
 
+    void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
+
     MutableColumns scatter(ColumnIndex num_columns, const Selector& selector) 
const override {
         return scatter_impl<ColumnString>(num_columns, selector);
     }
diff --git a/be/src/vec/columns/column_vector.cpp 
b/be/src/vec/columns/column_vector.cpp
index f640a60..aed2502 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -325,6 +325,20 @@ ColumnPtr ColumnVector<T>::replicate(const 
IColumn::Offsets& offsets) const {
 }
 
 template <typename T>
+void ColumnVector<T>::replicate(const uint32_t* counts, size_t target_size, 
IColumn& column) const {
+    size_t size = data.size();
+    if (size == 0) return;
+
+    auto& res = reinterpret_cast<ColumnVector<T>&>(column);
+    typename Self::Container& res_data = res.get_data();
+    res_data.reserve(target_size);
+
+    for (size_t i = 0; i < size; ++i) {
+        res_data.add_num_element_without_reserve(data[i], counts[i]);
+    }
+}
+
+template <typename T>
 void ColumnVector<T>::get_extremes(Field& min, Field& max) const {
     size_t size = data.size();
 
diff --git a/be/src/vec/columns/column_vector.h 
b/be/src/vec/columns/column_vector.h
index 8732032..8e06169 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -271,6 +271,8 @@ public:
 
     ColumnPtr replicate(const IColumn::Offsets& offsets) const override;
 
+    void replicate(const uint32_t* counts, size_t target_size, IColumn& 
column) const override;
+
     void get_extremes(Field& min, Field& max) const override;
 
     MutableColumns scatter(IColumn::ColumnIndex num_columns,
diff --git a/be/src/vec/common/hash_table/hash_table.h 
b/be/src/vec/common/hash_table/hash_table.h
index 53af823..f0a94b7 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -240,7 +240,7 @@ void insert_set_mapped(MappedType* dest, const ValueType& 
src) {
 
 /** Determines the size of the hash table, and when and how much it should be 
resized.
   */
-template <size_t initial_size_degree = 8>
+template <size_t initial_size_degree = 10>
 struct HashTableGrower {
     /// The state of this structure is enough to get the buffer size of the 
hash table.
     doris::vectorized::UInt8 size_degree = initial_size_degree;
diff --git a/be/src/vec/common/pod_array.h b/be/src/vec/common/pod_array.h
index 892b3d2..d6419fd 100644
--- a/be/src/vec/common/pod_array.h
+++ b/be/src/vec/common/pod_array.h
@@ -378,8 +378,8 @@ public:
 
     template <typename U, typename... TAllocatorParams>
     void add_num_element_without_reserve(U&& x, uint32_t num, 
TAllocatorParams&&... allocator_params) {
-            std::fill(t_end(), t_end() + num, x);
-            this->c_end += sizeof(T) * num;
+        std::fill(t_end(), t_end() + num, x);
+        this->c_end += sizeof(T) * num;
     }
 
     /**
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 9563ebf..4fa5429 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -175,9 +175,8 @@ struct ProcessHashTableProbe {
 
         KeyGetter key_getter(_probe_raw_ptrs, _join_node->_probe_key_sz, 
nullptr);
 
-        IColumn::Offsets offset_data;
+        std::vector<uint32_t> items_counts(_probe_rows);
         auto& mcol = mutable_block.mutable_columns();
-        offset_data.assign(_probe_rows, (uint32_t)0);
 
         int right_col_idx = _join_node->_is_right_semi_anti ? 0 : 
_left_table_data_types.size();
         int right_col_len = _right_table_data_types.size();
@@ -187,10 +186,12 @@ struct ProcessHashTableProbe {
             // ignore null rows
             if constexpr (ignore_null) {
                 if ((*null_map)[_probe_index]) {
-                    offset_data[_probe_index++] = current_offset;
+                    items_counts[_probe_index++] = 0;
                     continue;
                 }
             }
+
+            int repeat_count = 0;
             auto find_result =
                     (*null_map)[_probe_index]
                             ? 
decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index,
@@ -203,7 +204,7 @@ struct ProcessHashTableProbe {
             if (find_result.is_found()) {
                 // left semi join only need one match, do not need insert the 
data of right table
                 if (_join_node->_join_op == TJoinOp::LEFT_SEMI_JOIN) {
-                    ++current_offset;
+                    ++repeat_count;
                 } else if (_join_node->_join_op == TJoinOp::LEFT_ANTI_JOIN) {
                     // do nothing
                 } else {
@@ -215,7 +216,7 @@ struct ProcessHashTableProbe {
                         // right semi/anti join should dispose the data in 
hash table
                         // after probe data eof
                         if (!_join_node->_is_right_semi_anti) {
-                            ++current_offset;
+                            ++repeat_count;
                             for (size_t j = 0; j < right_col_len; ++j) {
                                 auto& column = 
*mapped.block->get_by_position(j).column;
                                 mcol[j + right_col_idx]->insert_from(column, 
mapped.row_num);
@@ -226,7 +227,7 @@ struct ProcessHashTableProbe {
                             // right semi/anti join should dispose the data in 
hash table
                             // after probe data eof
                             if (!_join_node->_is_right_semi_anti) {
-                                ++current_offset;
+                                ++repeat_count;
                                 for (size_t j = 0; j < right_col_len; ++j) {
                                     auto& column = 
*it->block->get_by_position(j).column;
                                     // TODO: interface insert from cause 
serious performance problems
@@ -240,7 +241,7 @@ struct ProcessHashTableProbe {
                 }
             } else if (_join_node->_match_all_probe ||
                        _join_node->_join_op == TJoinOp::LEFT_ANTI_JOIN) {
-                ++current_offset;
+                ++repeat_count;
                 // only full outer / left outer need insert the data of right 
table
                 if (_join_node->_match_all_probe) {
                     for (size_t j = 0; j < right_col_len; ++j) {
@@ -250,22 +251,19 @@ struct ProcessHashTableProbe {
                 }
             }
 
-            offset_data[_probe_index++] = current_offset;
+            items_counts[_probe_index++] = repeat_count;
+            current_offset += repeat_count;
 
             if (current_offset >= _batch_size) {
                 break;
             }
         }
-
-        for (int i = _probe_index; i < _probe_rows; ++i) {
-            offset_data[i] = current_offset;
-        }
-        output_block->swap(mutable_block.to_block());
-
+        
         for (int i = 0; i < right_col_idx; ++i) {
             auto& column = _probe_block.get_by_position(i).column;
-            output_block->get_by_position(i).column = 
column->replicate(offset_data);
+            column->replicate(items_counts.data(), current_offset, *mcol[i]);
         }
+        output_block->swap(mutable_block.to_block());
 
         return Status::OK();
     }
@@ -711,11 +709,13 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* 
output_block, bool* eo
     }
 
     Status st;
-    output_block->clear();
 
     if (_probe_index < _probe_block.rows()) {
-        MutableBlock 
mutable_block(VectorizedUtils::create_empty_columnswithtypename(
-                _have_other_join_conjunct ? _row_desc_for_other_join_conjunt : 
row_desc()));
+        MutableBlock mutable_block = (output_block->mem_reuse() && 
!_have_other_join_conjunct) ?
+                MutableBlock(output_block) :
+                MutableBlock(VectorizedUtils::create_empty_columnswithtypename(
+                        !_have_other_join_conjunct ? row_desc() : 
_row_desc_for_other_join_conjunt));
+
         std::visit(
                 [&](auto&& arg) {
                     using HashTableCtxType = std::decay_t<decltype(arg)>;
diff --git a/be/src/vec/functions/math.cpp b/be/src/vec/functions/math.cpp
index abff782..540c2c1 100644
--- a/be/src/vec/functions/math.cpp
+++ b/be/src/vec/functions/math.cpp
@@ -181,7 +181,7 @@ struct HexIntImpl {
         // uint64_t max value 0xFFFFFFFFFFFFFFFF , 16 'F'
         if (num == 0) { return {hex_table, 1};}
 
-        size_t i = 0;
+        int i = 0;
         while (num) {
             ans[i++] = hex_table[num & 15];
             num = num >> 4;
@@ -189,13 +189,13 @@ struct HexIntImpl {
         ans[i] = '\0';
 
         // reverse
-        for (int k = 0, j = i - 1; k <= j; k++, j--) {
+        for (int k = 0, j = i - 1; k <= j && k <= 16; k++, j--) {
             char tmp = ans[j];
             ans[j] = ans[k];
             ans[k] = tmp;
         }
 
-        return {ans, i};
+        return {ans, static_cast<size_t>(i)};
     }
     
     static Status vector(const ColumnInt64::Container& data, 
ColumnString::Chars& res_data,

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to