This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c7f758ef7 [improvement](hashjoin) support partitioned hash table in 
hash join (#14480)
6c7f758ef7 is described below

commit 6c7f758ef7d2db6d097b8c098702d7a2e0f2fbf5
Author: TengJianPing <[email protected]>
AuthorDate: Thu Nov 24 14:16:47 2022 +0800

    [improvement](hashjoin) support partitioned hash table in hash join (#14480)
---
 be/src/runtime/runtime_state.h                     |   7 +
 be/src/vec/common/hash_table/hash_table.h          | 167 ++++---
 .../vec/common/hash_table/partitioned_hash_map.h   |  64 +++
 .../vec/common/hash_table/partitioned_hash_table.h | 551 +++++++++++++++++++++
 be/src/vec/exec/join/vhash_join_node.cpp           |  47 +-
 be/src/vec/exec/join/vhash_join_node.h             |  14 +-
 .../vec/runtime/shared_hash_table_controller.cpp   |   2 +-
 .../java/org/apache/doris/qe/SessionVariable.java  |  17 +
 gensrc/thrift/PaloInternalService.thrift           |   2 +
 9 files changed, 795 insertions(+), 76 deletions(-)

diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 69b40f6c6f..cc9d8a4831 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -357,6 +357,13 @@ public:
         return _query_options.__isset.skip_delete_predicate && 
_query_options.skip_delete_predicate;
     }
 
+    int partitioned_hash_join_rows_threshold() const {
+        if (!_query_options.__isset.partitioned_hash_join_rows_threshold) {
+            return 0;
+        }
+        return _query_options.partitioned_hash_join_rows_threshold;
+    }
+
     const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
         return _tablet_commit_infos;
     }
diff --git a/be/src/vec/common/hash_table/hash_table.h 
b/be/src/vec/common/hash_table/hash_table.h
index 0f2a2de0a4..4be24d3649 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -431,7 +431,7 @@ protected:
     friend class Reader;
 
     template <typename, typename, typename, typename, typename, typename, 
size_t>
-    friend class TwoLevelHashTable;
+    friend class PartitionedHashTable;
 
     template <typename SubMaps>
     friend class StringHashTable;
@@ -445,6 +445,15 @@ protected:
     Grower grower;
     int64_t _resize_timer_ns;
 
+    // the bucket count threshold above which it's converted to partioned hash 
table
+    // > 0: enable convert dynamically
+    // 0: convert is disabled
+    int _partitioned_threshold = 0;
+    // if need resize and bucket count after resize will be >= 
_partitioned_threshold,
+    // this flag is set to true, and resize does not actually happen,
+    // PartitionedHashTable will convert this hash table to partitioned hash 
table
+    bool _need_partition = false;
+
     //factor that will trigger growing the hash table on insert.
     static constexpr float MAX_BUCKET_OCCUPANCY_FRACTION = 0.5f;
 
@@ -452,6 +461,14 @@ protected:
     mutable size_t collisions = 0;
 #endif
 
+    void set_partitioned_threshold(int threshold) { _partitioned_threshold = 
threshold; }
+
+    bool check_if_need_partition(size_t bucket_count) {
+        return _partitioned_threshold > 0 && bucket_count >= 
_partitioned_threshold;
+    }
+
+    bool need_partition() { return _need_partition; }
+
     /// Find a cell with the same key or an empty cell, starting from the 
specified position and further along the collision resolution chain.
     size_t ALWAYS_INLINE find_cell(const Key& x, size_t hash_value, size_t 
place_value) const {
         while (!buf[place_value].is_zero(*this) &&
@@ -501,63 +518,6 @@ protected:
         }
     }
 
-    /// Increase the size of the buffer.
-    void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) {
-        SCOPED_RAW_TIMER(&_resize_timer_ns);
-#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
-        Stopwatch watch;
-#endif
-
-        size_t old_size = grower.buf_size();
-
-        /** In case of exception for the object to remain in the correct state,
-          *  changing the variable `grower` (which determines the buffer size 
of the hash table)
-          *  is postponed for a moment after a real buffer change.
-          * The temporary variable `new_grower` is used to determine the new 
size.
-          */
-        Grower new_grower = grower;
-        if (for_num_elems) {
-            new_grower.set(for_num_elems);
-            if (new_grower.buf_size() <= old_size) return;
-        } else if (for_buf_size) {
-            new_grower.set_buf_size(for_buf_size);
-            if (new_grower.buf_size() <= old_size) return;
-        } else
-            new_grower.increase_size();
-
-        /// Expand the space.
-        buf = reinterpret_cast<Cell*>(Allocator::realloc(buf, 
get_buffer_size_in_bytes(),
-                                                         new_grower.buf_size() 
* sizeof(Cell)));
-        grower = new_grower;
-
-        /** Now some items may need to be moved to a new location.
-          * The element can stay in place, or move to a new location "on the 
right",
-          *  or move to the left of the collision resolution chain, because 
the elements to the left of it have been moved to the new "right" location.
-          */
-        size_t i = 0;
-        for (; i < old_size; ++i)
-            if (!buf[i].is_zero(*this) && !buf[i].is_deleted())
-                reinsert(buf[i], buf[i].get_hash(*this));
-
-        /** There is also a special case:
-          *    if the element was to be at the end of the old buffer,          
        [        x]
-          *    but is at the beginning because of the collision resolution 
chain,      [o       x]
-          *    then after resizing, it will first be out of place again,       
        [        xo        ]
-          *    and in order to transfer it where necessary,
-          *    after transferring all the elements from the old halves you 
need to     [         o   x    ]
-          *    process tail from the collision resolution chain immediately 
after it   [        o    x    ]
-          */
-        for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i)
-            reinsert(buf[i], buf[i].get_hash(*this));
-
-#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
-        watch.stop();
-        std::cerr << std::fixed << std::setprecision(3) << "Resize from " << 
old_size << " to "
-                  << grower.buf_size() << " took " << watch.elapsedSeconds() 
<< " sec."
-                  << std::endl;
-#endif
-    }
-
     /** Paste into the new buffer the value that was in the old buffer.
       * Used when increasing the buffer size.
       */
@@ -684,6 +644,8 @@ public:
         std::swap(buf, rhs.buf);
         std::swap(m_size, rhs.m_size);
         std::swap(grower, rhs.grower);
+        std::swap(_need_partition, rhs._need_partition);
+        std::swap(_partitioned_threshold, rhs._partitioned_threshold);
 
         Hash::operator=(std::move(rhs));
         Allocator::operator=(std::move(rhs));
@@ -816,10 +778,12 @@ protected:
                 throw;
             }
 
-            // The hash table was rehashed, so we have to re-find the key.
-            size_t new_place = find_cell(key, hash_value, 
grower.place(hash_value));
-            assert(!buf[new_place].is_zero(*this));
-            it = &buf[new_place];
+            if (LIKELY(!_need_partition)) {
+                // The hash table was rehashed, so we have to re-find the key.
+                size_t new_place = find_cell(key, hash_value, 
grower.place(hash_value));
+                assert(!buf[new_place].is_zero(*this));
+                it = &buf[new_place];
+            }
         }
     }
 
@@ -853,10 +817,12 @@ protected:
                 throw;
             }
 
-            // The hash table was rehashed, so we have to re-find the key.
-            size_t new_place = find_cell(key, hash_value, 
grower.place(hash_value));
-            assert(!buf[new_place].is_zero(*this));
-            it = &buf[new_place];
+            if (LIKELY(!_need_partition)) {
+                // The hash table was rehashed, so we have to re-find the key.
+                size_t new_place = find_cell(key, hash_value, 
grower.place(hash_value));
+                assert(!buf[new_place].is_zero(*this));
+                it = &buf[new_place];
+            }
         }
     }
 
@@ -1076,7 +1042,9 @@ public:
 
     float get_factor() const { return MAX_BUCKET_OCCUPANCY_FRACTION; }
 
-    bool should_be_shrink(int64_t valid_row) { return valid_row < get_factor() 
* (size() / 2.0); }
+    bool should_be_shrink(int64_t valid_row) const {
+        return valid_row < get_factor() * (size() / 2.0);
+    }
 
     void init_buf_size(size_t reserve_for_num_elements) {
         free();
@@ -1118,4 +1086,69 @@ public:
 #ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
     size_t getCollisions() const { return collisions; }
 #endif
+
+private:
+    /// Increase the size of the buffer.
+    void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) {
+        SCOPED_RAW_TIMER(&_resize_timer_ns);
+#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
+        Stopwatch watch;
+#endif
+
+        size_t old_size = grower.buf_size();
+
+        /** In case of exception for the object to remain in the correct state,
+          *  changing the variable `grower` (which determines the buffer size 
of the hash table)
+          *  is postponed for a moment after a real buffer change.
+          * The temporary variable `new_grower` is used to determine the new 
size.
+          */
+        Grower new_grower = grower;
+        if (for_num_elems) {
+            new_grower.set(for_num_elems);
+            if (new_grower.buf_size() <= old_size) return;
+        } else if (for_buf_size) {
+            new_grower.set_buf_size(for_buf_size);
+            if (new_grower.buf_size() <= old_size) return;
+        } else
+            new_grower.increase_size();
+
+        // new bucket count exceed partitioned hash table bucket count 
threshold,
+        // don't resize and set need partition flag
+        if (check_if_need_partition(new_grower.buf_size())) {
+            _need_partition = true;
+            return;
+        }
+
+        /// Expand the space.
+        buf = reinterpret_cast<Cell*>(Allocator::realloc(buf, 
get_buffer_size_in_bytes(),
+                                                         new_grower.buf_size() 
* sizeof(Cell)));
+        grower = new_grower;
+
+        /** Now some items may need to be moved to a new location.
+          * The element can stay in place, or move to a new location "on the 
right",
+          *  or move to the left of the collision resolution chain, because 
the elements to the left of it have been moved to the new "right" location.
+          */
+        size_t i = 0;
+        for (; i < old_size; ++i)
+            if (!buf[i].is_zero(*this) && !buf[i].is_deleted())
+                reinsert(buf[i], buf[i].get_hash(*this));
+
+        /** There is also a special case:
+          *    if the element was to be at the end of the old buffer,          
        [        x]
+          *    but is at the beginning because of the collision resolution 
chain,      [o       x]
+          *    then after resizing, it will first be out of place again,       
        [        xo        ]
+          *    and in order to transfer it where necessary,
+          *    after transferring all the elements from the old halves you 
need to     [         o   x    ]
+          *    process tail from the collision resolution chain immediately 
after it   [        o    x    ]
+          */
+        for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i)
+            reinsert(buf[i], buf[i].get_hash(*this));
+
+#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
+        watch.stop();
+        std::cerr << std::fixed << std::setprecision(3) << "Resize from " << 
old_size << " to "
+                  << grower.buf_size() << " took " << watch.elapsedSeconds() 
<< " sec."
+                  << std::endl;
+#endif
+    }
 };
diff --git a/be/src/vec/common/hash_table/partitioned_hash_map.h 
b/be/src/vec/common/hash_table/partitioned_hash_map.h
new file mode 100644
index 0000000000..fabf61b27a
--- /dev/null
+++ b/be/src/vec/common/hash_table/partitioned_hash_map.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/TwoLevelHashMap.h
+// and modified by Doris
+#pragma once
+
+#include "vec/common/hash_table/hash_map.h"
+#include "vec/common/hash_table/partitioned_hash_table.h"
+
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+          typename Grower = PartitionedHashTableGrower<>, typename Allocator = 
HashTableAllocator,
+          template <typename...> typename ImplTable = HashMapTable>
+class PartitionedHashMapTable
+        : public PartitionedHashTable<Key, Cell, Hash, Grower, Allocator,
+                                      ImplTable<Key, Cell, Hash, Grower, 
Allocator>> {
+public:
+    using Impl = ImplTable<Key, Cell, Hash, Grower, Allocator>;
+    using Base = PartitionedHashTable<Key, Cell, Hash, Grower, Allocator,
+                                      ImplTable<Key, Cell, Hash, Grower, 
Allocator>>;
+    using LookupResult = typename Impl::LookupResult;
+
+    using Base::Base;
+    using Base::prefetch;
+
+    using mapped_type = typename Cell::Mapped;
+
+    typename Cell::Mapped& ALWAYS_INLINE operator[](const Key& x) {
+        LookupResult it;
+        bool inserted;
+        this->emplace(x, it, inserted);
+
+        if (inserted) new (lookup_result_get_mapped(it)) mapped_type();
+
+        return *lookup_result_get_mapped(it);
+    }
+};
+
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
+          typename Grower = PartitionedHashTableGrower<>, typename Allocator = 
HashTableAllocator,
+          template <typename...> typename ImplTable = HashMapTable>
+using PartitionedHashMap = PartitionedHashMapTable<Key, HashMapCell<Key, 
Mapped, Hash>, Hash,
+                                                   Grower, Allocator, 
ImplTable>;
+
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
+          typename Grower = PartitionedHashTableGrower<>, typename Allocator = 
HashTableAllocator,
+          template <typename...> typename ImplTable = HashMapTable>
+using PartitionedHashMapWithSavedHash =
+        PartitionedHashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped, 
Hash>, Hash, Grower,
+                                Allocator, ImplTable>;
diff --git a/be/src/vec/common/hash_table/partitioned_hash_table.h 
b/be/src/vec/common/hash_table/partitioned_hash_table.h
new file mode 100644
index 0000000000..d0cdc25f89
--- /dev/null
+++ b/be/src/vec/common/hash_table/partitioned_hash_table.h
@@ -0,0 +1,551 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/TwoLevelHashTable.h
+// and modified by Doris
+#pragma once
+
+#include "vec/common/hash_table/hash_table.h"
+
+/** Partitioned hash table.
+  * Represents 16 (or 1ULL << BITS_FOR_SUB_TABLE) small hash tables (sub table 
count of the first level).
+  * To determine which one to use, one of the bytes of the hash function is 
taken.
+  *
+  * Usually works a little slower than a simple hash table.
+  * However, it has advantages in some cases:
+  * - if you need to merge two hash tables together, then you can easily 
parallelize it by sub tables;
+  * - delay during resizes is amortized, since the small hash tables will be 
resized separately;
+  * - in theory, resizes are cache-local in a larger range of sizes.
+  */
+
+template <size_t initial_size_degree = 8>
+struct PartitionedHashTableGrower : public 
HashTableGrowerWithPrecalculation<initial_size_degree> {
+    /// Increase the size of the hash table.
+    void increase_size() { this->increase_size_degree(this->size_degree() >= 
15 ? 1 : 2); }
+};
+
+template <typename Key, typename Cell, typename Hash, typename Grower, 
typename Allocator,
+          typename ImplTable = HashTable<Key, Cell, Hash, Grower, Allocator>,
+          size_t BITS_FOR_SUB_TABLE = 4>
+class PartitionedHashTable : private boost::noncopyable,
+                             protected Hash /// empty base optimization
+{
+public:
+    using Impl = ImplTable;
+
+    using key_type = typename Impl::key_type;
+    using mapped_type = typename Impl::mapped_type;
+    using value_type = typename Impl::value_type;
+    using cell_type = typename Impl::cell_type;
+
+    using LookupResult = typename Impl::LookupResult;
+    using ConstLookupResult = typename Impl::ConstLookupResult;
+
+protected:
+    friend class const_iterator;
+    friend class iterator;
+
+    using HashValue = size_t;
+    using Self = PartitionedHashTable;
+
+private:
+    static constexpr size_t NUM_LEVEL1_SUB_TABLES = 1ULL << BITS_FOR_SUB_TABLE;
+    static constexpr size_t MAX_SUB_TABLE = NUM_LEVEL1_SUB_TABLES - 1;
+
+    //factor that will trigger growing the hash table on insert.
+    static constexpr float MAX_SUB_TABLE_OCCUPANCY_FRACTION = 0.5f;
+
+    Impl level0_sub_table;
+    Impl level1_sub_tables[NUM_LEVEL1_SUB_TABLES];
+
+    bool _is_partitioned = false;
+
+    int64_t _convert_timer_ns = 0;
+
+public:
+    PartitionedHashTable() = default;
+
+    PartitionedHashTable(PartitionedHashTable&& rhs) { *this = std::move(rhs); 
}
+
+    PartitionedHashTable& operator=(PartitionedHashTable&& rhs) {
+        std::swap(_is_partitioned, rhs._is_partitioned);
+
+        level0_sub_table = std::move(rhs.level0_sub_table);
+        for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+            level1_sub_tables[i] = std::move(rhs.level1_sub_tables[i]);
+        }
+
+        Hash::operator=(std::move(rhs));
+        return *this;
+    }
+
+    size_t hash(const Key& x) const { return Hash::operator()(x); }
+
+    float get_factor() const { return MAX_SUB_TABLE_OCCUPANCY_FRACTION; }
+
+    int64_t get_convert_timer_value() const { return _convert_timer_ns; }
+
+    bool should_be_shrink(int64_t valid_row) const {
+        if (_is_partitioned) {
+            return false;
+        } else {
+            return level0_sub_table.should_be_shrink(valid_row);
+        }
+    }
+
+    template <typename Func>
+    void ALWAYS_INLINE for_each_value(Func&& func) {
+        if (_is_partitioned) {
+            for (auto i = 0u; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+                level1_sub_tables[i].for_each_value(func);
+            }
+        } else {
+            level0_sub_table.for_each_value(func);
+        }
+    }
+
+    size_t get_size() {
+        size_t count = 0;
+        if (_is_partitioned) {
+            for (auto i = 0u; i < this->NUM_LEVEL1_SUB_TABLES; ++i) {
+                for (auto& v : this->level1_sub_tables[i]) {
+                    count += v.get_second().get_row_count();
+                }
+            }
+        } else {
+            count = level0_sub_table.get_size();
+        }
+        return count;
+    }
+
+    void init_buf_size(size_t reserve_for_num_elements) {
+        if (_is_partitioned) {
+            for (auto& impl : level1_sub_tables) {
+                impl.init_buf_size(reserve_for_num_elements / 
NUM_LEVEL1_SUB_TABLES);
+            }
+        } else {
+            if 
(level0_sub_table.check_if_need_partition(reserve_for_num_elements)) {
+                level0_sub_table.clear_and_shrink();
+                _is_partitioned = true;
+
+                for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+                    
level1_sub_tables[i].init_buf_size(reserve_for_num_elements /
+                                                       NUM_LEVEL1_SUB_TABLES);
+                }
+            } else {
+                level0_sub_table.init_buf_size(reserve_for_num_elements);
+            }
+        }
+    }
+
+    void delete_zero_key(Key key) {
+        if (_is_partitioned) {
+            const auto key_hash = hash(key);
+            size_t sub_table_idx = get_sub_table_from_hash(key_hash);
+            level1_sub_tables[sub_table_idx].delete_zero_key(key);
+        } else {
+            level0_sub_table.delete_zero_key(key);
+        }
+    }
+
+    size_t get_buffer_size_in_bytes() const {
+        if (_is_partitioned) {
+            size_t buff_size = 0;
+            for (const auto& impl : level1_sub_tables) buff_size += 
impl.get_buffer_size_in_bytes();
+            return buff_size;
+        } else {
+            return level0_sub_table.get_buffer_size_in_bytes();
+        }
+    }
+
+    size_t get_buffer_size_in_cells() const {
+        if (_is_partitioned) {
+            size_t buff_size = 0;
+            for (const auto& impl : level1_sub_tables) buff_size += 
impl.get_buffer_size_in_cells();
+            return buff_size;
+        } else {
+            return level0_sub_table.get_buffer_size_in_cells();
+        }
+    }
+
+    std::vector<size_t> get_buffer_sizes_in_cells() const {
+        std::vector<size_t> sizes;
+        if (_is_partitioned) {
+            for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+                
sizes.push_back(level1_sub_tables[i].get_buffer_size_in_cells());
+            }
+        } else {
+            sizes.push_back(level0_sub_table.get_buffer_size_in_cells());
+        }
+        return sizes;
+    }
+
+    void reset_resize_timer() {
+        if (_is_partitioned) {
+            for (auto& impl : level1_sub_tables) {
+                impl.reset_resize_timer();
+            }
+        } else {
+            level0_sub_table.reset_resize_timer();
+        }
+    }
+    int64_t get_resize_timer_value() const {
+        if (_is_partitioned) {
+            int64_t resize_timer_ns = 0;
+            for (const auto& impl : level1_sub_tables) {
+                resize_timer_ns += impl.get_resize_timer_value();
+            }
+            return resize_timer_ns;
+        } else {
+            return level0_sub_table.get_resize_timer_value();
+        }
+    }
+
+protected:
+    typename Impl::iterator begin_of_next_non_empty_sub_table_idx(size_t& 
sub_table_idx) {
+        while (sub_table_idx != NUM_LEVEL1_SUB_TABLES && 
level1_sub_tables[sub_table_idx].empty())
+            ++sub_table_idx;
+
+        if (sub_table_idx != NUM_LEVEL1_SUB_TABLES) return 
level1_sub_tables[sub_table_idx].begin();
+
+        --sub_table_idx;
+        return level1_sub_tables[MAX_SUB_TABLE].end();
+    }
+
+    typename Impl::const_iterator begin_of_next_non_empty_sub_table_idx(
+            size_t& sub_table_idx) const {
+        while (sub_table_idx != NUM_LEVEL1_SUB_TABLES && 
level1_sub_tables[sub_table_idx].empty())
+            ++sub_table_idx;
+
+        if (sub_table_idx != NUM_LEVEL1_SUB_TABLES) return 
level1_sub_tables[sub_table_idx].begin();
+
+        --sub_table_idx;
+        return level1_sub_tables[MAX_SUB_TABLE].end();
+    }
+
+public:
+    void set_partitioned_threshold(int threshold) {
+        level0_sub_table.set_partitioned_threshold(threshold);
+    }
+
+    class iterator /// NOLINT
+    {
+        Self* container {};
+        size_t sub_table_idx {};
+        typename Impl::iterator current_it {};
+
+        friend class PartitionedHashTable;
+
+        iterator(Self* container_, size_t sub_table_idx_, typename 
Impl::iterator current_it_)
+                : container(container_), sub_table_idx(sub_table_idx_), 
current_it(current_it_) {}
+
+    public:
+        iterator() = default;
+
+        bool operator==(const iterator& rhs) const {
+            return sub_table_idx == rhs.sub_table_idx && current_it == 
rhs.current_it;
+        }
+        bool operator!=(const iterator& rhs) const { return !(*this == rhs); }
+
+        iterator& operator++() {
+            ++current_it;
+            if (container->_is_partitioned) {
+                if (current_it == 
container->level1_sub_tables[sub_table_idx].end()) {
+                    ++sub_table_idx;
+                    current_it = 
container->begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+                }
+            }
+
+            return *this;
+        }
+
+        Cell& operator*() const { return *current_it; }
+        Cell* operator->() const { return current_it.get_ptr(); }
+
+        Cell* get_ptr() const { return current_it.get_ptr(); }
+        size_t get_hash() const { return current_it.get_hash(); }
+    };
+
+    class const_iterator /// NOLINT
+    {
+        Self* container {};
+        size_t sub_table_idx {};
+        typename Impl::const_iterator current_it {};
+
+        friend class PartitionedHashTable;
+
+        const_iterator(Self* container_, size_t sub_table_idx_,
+                       typename Impl::const_iterator current_it_)
+                : container(container_), sub_table_idx(sub_table_idx_), 
current_it(current_it_) {}
+
+    public:
+        const_iterator() = default;
+        const_iterator(const iterator& rhs)
+                : container(rhs.container),
+                  sub_table_idx(rhs.sub_table_idx),
+                  current_it(rhs.current_it) {} /// NOLINT
+
+        bool operator==(const const_iterator& rhs) const {
+            return sub_table_idx == rhs.sub_table_idx && current_it == 
rhs.current_it;
+        }
+        bool operator!=(const const_iterator& rhs) const { return !(*this == 
rhs); }
+
+        const_iterator& operator++() {
+            ++current_it;
+            if (container->_is_partitioned) {
+                if (current_it == 
container->level1_sub_tables[sub_table_idx].end()) {
+                    ++sub_table_idx;
+                    current_it = 
container->begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+                }
+            }
+
+            return *this;
+        }
+
+        const Cell& operator*() const { return *current_it; }
+        const Cell* operator->() const { return current_it->get_ptr(); }
+
+        const Cell* get_ptr() const { return current_it.get_ptr(); }
+        size_t get_hash() const { return current_it.get_hash(); }
+    };
+
+    const_iterator begin() const {
+        if (_is_partitioned) {
+            size_t sub_table_idx = 0;
+            typename Impl::const_iterator impl_it =
+                    begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+            return {this, sub_table_idx, impl_it};
+        } else {
+            return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.begin()};
+        }
+    }
+
+    iterator begin() {
+        if (_is_partitioned) {
+            size_t sub_table_idx = 0;
+            typename Impl::iterator impl_it = 
begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+            return {this, sub_table_idx, impl_it};
+        } else {
+            return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.begin()};
+        }
+    }
+
+    const_iterator end() const {
+        if (_is_partitioned) {
+            return {this, MAX_SUB_TABLE, 
level1_sub_tables[MAX_SUB_TABLE].end()};
+        } else {
+            return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.end()};
+        }
+    }
+    iterator end() {
+        if (_is_partitioned) {
+            return {this, MAX_SUB_TABLE, 
level1_sub_tables[MAX_SUB_TABLE].end()};
+        } else {
+            return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.end()};
+        }
+    }
+
+    /// Insert a value. In the case of any more complex values, it is better 
to use the `emplace` function.
+    std::pair<LookupResult, bool> ALWAYS_INLINE insert(const value_type& x) {
+        size_t hash_value = hash(Cell::get_key(x));
+
+        std::pair<LookupResult, bool> res;
+        emplace(Cell::get_key(x), res.first, res.second, hash_value);
+
+        if (res.second) insert_set_mapped(lookup_result_get_mapped(res.first), 
x);
+
+        return res;
+    }
+
+    void expanse_for_add_elem(size_t num_elem) {
+        if (_is_partitioned) {
+            size_t num_elem_per_sub_table =
+                    (num_elem + NUM_LEVEL1_SUB_TABLES - 1) / 
NUM_LEVEL1_SUB_TABLES;
+            for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+                
level1_sub_tables[i].expanse_for_add_elem(num_elem_per_sub_table);
+            }
+        } else {
+            level0_sub_table.expanse_for_add_elem(num_elem);
+            if (UNLIKELY(level0_sub_table.need_partition())) {
+                convert_to_partitioned();
+            }
+        }
+    }
+
+    template <typename KeyHolder>
+    void ALWAYS_INLINE prefetch(KeyHolder& key_holder) {
+        if (_is_partitioned) {
+            const auto& key = key_holder_get_key(key_holder);
+            const auto key_hash = hash(key);
+            const auto sub_table_idx = get_sub_table_from_hash(key_hash);
+            level1_sub_tables[sub_table_idx].prefetch(key_holder);
+        } else {
+            level0_sub_table.prefetch(key_holder);
+        }
+    }
+
+    template <bool READ>
+    void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) {
+        if (_is_partitioned) {
+            const auto sub_table_idx = get_sub_table_from_hash(hash_value);
+            level1_sub_tables[sub_table_idx].template 
prefetch_by_hash<READ>(hash_value);
+        } else {
+            level0_sub_table.template prefetch_by_hash<READ>(hash_value);
+        }
+    }
+
+    template <bool READ, typename KeyHolder>
+    void ALWAYS_INLINE prefetch(KeyHolder& key_holder) {
+        if (_is_partitioned) {
+            const auto& key = key_holder_get_key(key_holder);
+            const auto key_hash = hash(key);
+            const auto sub_table_idx = get_sub_table_from_hash(key_hash);
+            level1_sub_tables[sub_table_idx].template 
prefetch<READ>(key_holder);
+        } else {
+            level0_sub_table.template prefetch<READ>(key_holder);
+        }
+    }
+
+    /** Insert the key,
+      * return an iterator to a position that can be used for `placement new` 
of value,
+      * as well as the flag - whether a new key was inserted.
+      *
+      * You have to make `placement new` values if you inserted a new key,
+      * since when destroying a hash table, the destructor will be invoked for 
it!
+      *
+      * Example usage:
+      *
+      * Map::iterator it;
+      * bool inserted;
+      * map.emplace(key, it, inserted);
+      * if (inserted)
+      *     new(&it->second) Mapped(value);
+      */
+    template <typename KeyHolder>
+    void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& 
inserted) {
+        size_t hash_value = hash(key_holder_get_key(key_holder));
+        emplace(key_holder, it, inserted, hash_value);
+    }
+
+    /// Same, but with a precalculated values of hash function.
+    template <typename KeyHolder>
+    void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool& 
inserted,
+                               size_t hash_value) {
+        if (_is_partitioned) {
+            size_t sub_table_idx = get_sub_table_from_hash(hash_value);
+            level1_sub_tables[sub_table_idx].emplace(key_holder, it, inserted, 
hash_value);
+        } else {
+            level0_sub_table.emplace(key_holder, it, inserted, hash_value);
+            if (UNLIKELY(level0_sub_table.need_partition())) {
+                convert_to_partitioned();
+
+                // The hash table was converted to partitioned, so we have to 
re-find the key.
+                size_t sub_table_id = get_sub_table_from_hash(hash_value);
+                it = 
level1_sub_tables[sub_table_id].find(key_holder_get_key(key_holder),
+                                                          hash_value);
+            }
+        }
+    }
+
+    template <typename KeyHolder>
+    void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, 
size_t hash_value,
+                               bool& inserted) {
+        emplace(key_holder, it, inserted, hash_value);
+    }
+
+    LookupResult ALWAYS_INLINE find(Key x, size_t hash_value) {
+        if (_is_partitioned) {
+            size_t sub_table_idx = get_sub_table_from_hash(hash_value);
+            return level1_sub_tables[sub_table_idx].find(x, hash_value);
+        } else {
+            return level0_sub_table.find(x, hash_value);
+        }
+    }
+
+    ConstLookupResult ALWAYS_INLINE find(Key x, size_t hash_value) const {
+        return const_cast<std::decay_t<decltype(*this)>*>(this)->find(x, 
hash_value);
+    }
+
+    LookupResult ALWAYS_INLINE find(Key x) { return find(x, hash(x)); }
+
+    ConstLookupResult ALWAYS_INLINE find(Key x) const { return find(x, 
hash(x)); }
+
+    size_t size() const {
+        if (_is_partitioned) {
+            size_t res = 0;
+            for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) res += 
level1_sub_tables[i].size();
+            return res;
+        } else {
+            return level0_sub_table.size();
+        }
+    }
+
+    std::vector<size_t> sizes() const {
+        std::vector<size_t> sizes;
+        if (_is_partitioned) {
+            for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+                sizes.push_back(level1_sub_tables[i].size());
+            }
+        } else {
+            sizes.push_back(level0_sub_table.size());
+        }
+        return sizes;
+    }
+
+    bool empty() const {
+        if (_is_partitioned) {
+            for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i)
+                if (!level1_sub_tables[i].empty()) return false;
+            return true;
+        } else {
+            return level0_sub_table.empty();
+        }
+    }
+
+private:
+    void convert_to_partitioned() {
+        SCOPED_RAW_TIMER(&_convert_timer_ns);
+
+        auto bucket_count = level0_sub_table.get_buffer_size_in_cells();
+        for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+            level1_sub_tables[i] = std::move(Impl(bucket_count / 
NUM_LEVEL1_SUB_TABLES));
+        }
+
+        auto it = level0_sub_table.begin();
+
+        /// It is assumed that the zero key (stored separately) is first in 
iteration order.
+        if (it != level0_sub_table.end() && 
it.get_ptr()->is_zero(level0_sub_table)) {
+            insert(it->get_value());
+            ++it;
+        }
+
+        for (; it != level0_sub_table.end(); ++it) {
+            const Cell* cell = it.get_ptr();
+            size_t hash_value = cell->get_hash(level0_sub_table);
+            size_t sub_table_idx = get_sub_table_from_hash(hash_value);
+            level1_sub_tables[sub_table_idx].insert_unique_non_zero(cell, 
hash_value);
+        }
+
+        _is_partitioned = true;
+        level0_sub_table.clear_and_shrink();
+    }
+
+    /// NOTE Bad for hash tables with more than 2^32 cells.
+    static size_t get_sub_table_from_hash(size_t hash_value) {
+        return (hash_value >> (32 - BITS_FOR_SUB_TABLE)) & MAX_SUB_TABLE;
+    }
+};
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp 
b/be/src/vec/exec/join/vhash_join_node.cpp
index 79beb01a83..da144378b1 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -73,20 +73,37 @@ struct ProcessHashTableBuild {
 
         Defer defer {[&]() {
             int64_t bucket_size = 
hash_table_ctx.hash_table.get_buffer_size_in_cells();
+            int64_t filled_bucket_size = hash_table_ctx.hash_table.size();
             int64_t bucket_bytes = 
hash_table_ctx.hash_table.get_buffer_size_in_bytes();
             _join_node->_mem_used += bucket_bytes - old_bucket_bytes;
             COUNTER_SET(_join_node->_build_buckets_counter, bucket_size);
+            COUNTER_SET(_join_node->_build_buckets_fill_counter, 
filled_bucket_size);
+
+            auto hash_table_buckets = 
hash_table_ctx.hash_table.get_buffer_sizes_in_cells();
+            std::string hash_table_buckets_info;
+            for (auto bucket_count : hash_table_buckets) {
+                hash_table_buckets_info += std::to_string(bucket_count) + ", ";
+            }
+            _join_node->add_hash_buckets_info(hash_table_buckets_info);
+
+            auto hash_table_sizes = hash_table_ctx.hash_table.sizes();
+            hash_table_buckets_info.clear();
+            for (auto table_size : hash_table_sizes) {
+                hash_table_buckets_info += std::to_string(table_size) + ", ";
+            }
+            _join_node->add_hash_buckets_filled_info(hash_table_buckets_info);
         }};
 
         KeyGetter key_getter(_build_raw_ptrs, _join_node->_build_key_sz, 
nullptr);
 
         SCOPED_TIMER(_join_node->_build_table_insert_timer);
+        hash_table_ctx.hash_table.reset_resize_timer();
+
         // only not build_unique, we need expanse hash table before insert data
         if (!_join_node->_build_unique) {
             // _rows contains null row, which will cause hash table resize to 
be large.
             
RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem(_rows));
         }
-        hash_table_ctx.hash_table.reset_resize_timer();
 
         vector<int>& inserted_rows = 
_join_node->_inserted_rows[&_acquired_block];
         bool has_runtime_filter = !_join_node->_runtime_filter_descs.empty();
@@ -172,6 +189,9 @@ struct ProcessHashTableBuild {
 
         COUNTER_UPDATE(_join_node->_build_table_expanse_timer,
                        hash_table_ctx.hash_table.get_resize_timer_value());
+        COUNTER_UPDATE(_join_node->_build_table_convert_timer,
+                       hash_table_ctx.hash_table.get_convert_timer_value());
+
         return Status::OK();
     }
 
@@ -337,6 +357,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
     _build_table_insert_timer = ADD_TIMER(build_phase_profile, 
"BuildTableInsertTime");
     _build_expr_call_timer = ADD_TIMER(build_phase_profile, 
"BuildExprCallTime");
     _build_table_expanse_timer = ADD_TIMER(build_phase_profile, 
"BuildTableExpanseTime");
+    _build_table_convert_timer =
+            ADD_TIMER(build_phase_profile, 
"BuildTableConvertToPartitionedTime");
     _build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows", 
TUnit::UNIT);
     _build_side_compute_hash_timer = ADD_TIMER(build_phase_profile, 
"BuildSideHashComputingTime");
 
@@ -355,6 +377,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
     _push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
     _push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
     _build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets", 
TUnit::UNIT);
+    _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(), 
"FilledBuckets", TUnit::UNIT);
 
     if (_is_broadcast_join) {
         runtime_profile()->add_info_string("BroadcastJoin", "true");
@@ -374,13 +397,21 @@ Status HashJoinNode::prepare(RuntimeState* state) {
     _left_table_data_types = 
VectorizedUtils::get_data_types(child(0)->row_desc());
 
     // Hash Table Init
-    _hash_table_init();
+    _hash_table_init(state);
     _process_hashtable_ctx_variants_init(state);
     _construct_mutable_join_block();
 
     return Status::OK();
 }
 
+void HashJoinNode::add_hash_buckets_info(const std::string& info) {
+    runtime_profile()->add_info_string("HashTableBuckets", info);
+}
+
+void HashJoinNode::add_hash_buckets_filled_info(const std::string& info) {
+    runtime_profile()->add_info_string("HashTableFilledBuckets", info);
+}
+
 Status HashJoinNode::close(RuntimeState* state) {
     if (is_closed()) {
         return Status::OK();
@@ -833,7 +864,7 @@ Status HashJoinNode::_process_build_block(RuntimeState* 
state, Block& block, uin
     return st;
 }
 
-void HashJoinNode::_hash_table_init() {
+void HashJoinNode::_hash_table_init(RuntimeState* state) {
     std::visit(
             [&](auto&& join_op_variants, auto have_other_join_conjunct) {
                 using JoinOpType = std::decay_t<decltype(join_op_variants)>;
@@ -956,6 +987,16 @@ void HashJoinNode::_hash_table_init() {
             _join_op_variants, make_bool_variant(_have_other_join_conjunct));
 
     DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
+
+    std::visit(Overload {[&](std::monostate& arg) {
+                             LOG(FATAL) << "FATAL: uninited hash table";
+                             __builtin_unreachable();
+                         },
+                         [&](auto&& arg) {
+                             arg.hash_table_ptr->set_partitioned_threshold(
+                                     
state->partitioned_hash_join_rows_threshold());
+                         }},
+               *_hash_table_variants);
 }
 
 void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h 
b/be/src/vec/exec/join/vhash_join_node.h
index 691857c986..72d8317468 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -24,7 +24,7 @@
 #include "join_op.h"
 #include "process_hash_table_probe.h"
 #include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map.h"
+#include "vec/common/hash_table/partitioned_hash_map.h"
 #include "vjoin_node_base.h"
 
 namespace doris {
@@ -39,7 +39,7 @@ class SharedHashTableController;
 template <typename RowRefListType>
 struct SerializedHashTableContext {
     using Mapped = RowRefListType;
-    using HashTable = HashMap<StringRef, Mapped>;
+    using HashTable = PartitionedHashMap<StringRef, Mapped>;
     using State = ColumnsHashing::HashMethodSerialized<typename 
HashTable::value_type, Mapped>;
     using Iter = typename HashTable::iterator;
 
@@ -70,7 +70,7 @@ struct 
IsSerializedHashTableContextTraits<ColumnsHashing::HashMethodSerialized<V
 template <class T, typename RowRefListType>
 struct PrimaryTypeHashTableContext {
     using Mapped = RowRefListType;
-    using HashTable = HashMap<T, Mapped, HashCRC32<T>>;
+    using HashTable = PartitionedHashMap<T, Mapped, HashCRC32<T>>;
     using State =
             ColumnsHashing::HashMethodOneNumber<typename 
HashTable::value_type, Mapped, T, false>;
     using Iter = typename HashTable::iterator;
@@ -105,7 +105,7 @@ using I256HashTableContext = 
PrimaryTypeHashTableContext<UInt256, RowRefListType
 template <class T, bool has_null, typename RowRefListType>
 struct FixedKeyHashTableContext {
     using Mapped = RowRefListType;
-    using HashTable = HashMap<T, Mapped, HashCRC32<T>>;
+    using HashTable = PartitionedHashMap<T, Mapped, HashCRC32<T>>;
     using State = ColumnsHashing::HashMethodKeysFixed<typename 
HashTable::value_type, T, Mapped,
                                                       has_null, false>;
     using Iter = typename HashTable::iterator;
@@ -192,6 +192,8 @@ public:
     Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) 
override;
     Status get_next(RuntimeState* state, Block* block, bool* eos) override;
     Status close(RuntimeState* state) override;
+    void add_hash_buckets_info(const std::string& info);
+    void add_hash_buckets_filled_info(const std::string& info);
 
 private:
     using VExprContexts = std::vector<VExprContext*>;
@@ -218,9 +220,11 @@ private:
     RuntimeProfile::Counter* _build_expr_call_timer;
     RuntimeProfile::Counter* _build_table_insert_timer;
     RuntimeProfile::Counter* _build_table_expanse_timer;
+    RuntimeProfile::Counter* _build_table_convert_timer;
     RuntimeProfile::Counter* _probe_expr_call_timer;
     RuntimeProfile::Counter* _probe_next_timer;
     RuntimeProfile::Counter* _build_buckets_counter;
+    RuntimeProfile::Counter* _build_buckets_fill_counter;
     RuntimeProfile::Counter* _push_down_timer;
     RuntimeProfile::Counter* _push_compute_timer;
     RuntimeProfile::Counter* _search_hashtable_timer;
@@ -281,7 +285,7 @@ private:
 
     void _set_build_ignore_flag(Block& block, const std::vector<int>& 
res_col_ids);
 
-    void _hash_table_init();
+    void _hash_table_init(RuntimeState* state);
     void _process_hashtable_ctx_variants_init(RuntimeState* state);
 
     static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128;
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp 
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 8ca3656ad4..7d92dabedf 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -121,10 +121,10 @@ Status 
SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_i
 
 Status SharedHashTableController::wait_for_closable(RuntimeState* state, int 
my_node_id) {
     std::unique_lock<std::mutex> lock(_mutex);
-    RETURN_IF_CANCELLED(state);
     if (!_ref_fragments[my_node_id].empty()) {
         _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); });
     }
+    RETURN_IF_CANCELLED(state);
     return Status::OK();
 }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 59b653a88f..c8622de800 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -229,6 +229,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String INTERNAL_SESSION = "internal_session";
 
+    public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD = 
"partitioned_hash_join_rows_threshold";
+
     // session origin value
     public Map<Field, String> sessionOriginValue = new HashMap<Field, 
String>();
     // check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -601,6 +603,10 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = INTERNAL_SESSION)
     public boolean internalSession = false;
 
+    // Use partitioned hash join if build side row count >= the threshold . 0 
- the threshold is not set.
+    @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD)
+    public int partitionedHashJoinRowsThreshold = 0;
+
     // If this fe is in fuzzy mode, then will use initFuzzyModeVariables to 
generate some variables,
     // not the default value set in the code.
     public void initFuzzyModeVariables() {
@@ -609,6 +615,7 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableLocalExchange = random.nextBoolean();
         this.disableJoinReorder = random.nextBoolean();
         this.disableStreamPreaggregations = random.nextBoolean();
+        // this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 : 
1048576;
     }
 
     public String getBlockEncryptionMode() {
@@ -842,6 +849,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enablePartitionCache = enablePartitionCache;
     }
 
+    public int getPartitionedHashJoinRowsThreshold() {
+        return partitionedHashJoinRowsThreshold;
+    }
+
+    public void setPartitionedHashJoinRowsThreshold(int threshold) {
+        this.partitionedHashJoinRowsThreshold = threshold;
+    }
+
     // Serialize to thrift object
     public boolean getForwardToMaster() {
         return forwardToMaster;
@@ -1244,6 +1259,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
         tResult.setSkipDeletePredicate(skipDeletePredicate);
 
+        
tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold);
+
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index ded68c7458..b725ea9191 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -179,6 +179,8 @@ struct TQueryOptions {
   51: optional bool enable_new_shuffle_hash_method
 
   52: optional i32 be_exec_version = 0
+  
+  53: optional i32 partitioned_hash_join_rows_threshold = 0
 }
     
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to