This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6c7f758ef7 [improvement](hashjoin) support partitioned hash table in
hash join (#14480)
6c7f758ef7 is described below
commit 6c7f758ef7d2db6d097b8c098702d7a2e0f2fbf5
Author: TengJianPing <[email protected]>
AuthorDate: Thu Nov 24 14:16:47 2022 +0800
[improvement](hashjoin) support partitioned hash table in hash join (#14480)
---
be/src/runtime/runtime_state.h | 7 +
be/src/vec/common/hash_table/hash_table.h | 167 ++++---
.../vec/common/hash_table/partitioned_hash_map.h | 64 +++
.../vec/common/hash_table/partitioned_hash_table.h | 551 +++++++++++++++++++++
be/src/vec/exec/join/vhash_join_node.cpp | 47 +-
be/src/vec/exec/join/vhash_join_node.h | 14 +-
.../vec/runtime/shared_hash_table_controller.cpp | 2 +-
.../java/org/apache/doris/qe/SessionVariable.java | 17 +
gensrc/thrift/PaloInternalService.thrift | 2 +
9 files changed, 795 insertions(+), 76 deletions(-)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 69b40f6c6f..cc9d8a4831 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -357,6 +357,13 @@ public:
return _query_options.__isset.skip_delete_predicate &&
_query_options.skip_delete_predicate;
}
+ int partitioned_hash_join_rows_threshold() const {
+ if (!_query_options.__isset.partitioned_hash_join_rows_threshold) {
+ return 0;
+ }
+ return _query_options.partitioned_hash_join_rows_threshold;
+ }
+
const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
return _tablet_commit_infos;
}
diff --git a/be/src/vec/common/hash_table/hash_table.h
b/be/src/vec/common/hash_table/hash_table.h
index 0f2a2de0a4..4be24d3649 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -431,7 +431,7 @@ protected:
friend class Reader;
template <typename, typename, typename, typename, typename, typename,
size_t>
- friend class TwoLevelHashTable;
+ friend class PartitionedHashTable;
template <typename SubMaps>
friend class StringHashTable;
@@ -445,6 +445,15 @@ protected:
Grower grower;
int64_t _resize_timer_ns;
+ // the bucket count threshold above which it's converted to partioned hash
table
+ // > 0: enable convert dynamically
+ // 0: convert is disabled
+ int _partitioned_threshold = 0;
+ // if need resize and bucket count after resize will be >=
_partitioned_threshold,
+ // this flag is set to true, and resize does not actually happen,
+ // PartitionedHashTable will convert this hash table to partitioned hash
table
+ bool _need_partition = false;
+
//factor that will trigger growing the hash table on insert.
static constexpr float MAX_BUCKET_OCCUPANCY_FRACTION = 0.5f;
@@ -452,6 +461,14 @@ protected:
mutable size_t collisions = 0;
#endif
+ void set_partitioned_threshold(int threshold) { _partitioned_threshold =
threshold; }
+
+ bool check_if_need_partition(size_t bucket_count) {
+ return _partitioned_threshold > 0 && bucket_count >=
_partitioned_threshold;
+ }
+
+ bool need_partition() { return _need_partition; }
+
/// Find a cell with the same key or an empty cell, starting from the
specified position and further along the collision resolution chain.
size_t ALWAYS_INLINE find_cell(const Key& x, size_t hash_value, size_t
place_value) const {
while (!buf[place_value].is_zero(*this) &&
@@ -501,63 +518,6 @@ protected:
}
}
- /// Increase the size of the buffer.
- void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) {
- SCOPED_RAW_TIMER(&_resize_timer_ns);
-#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
- Stopwatch watch;
-#endif
-
- size_t old_size = grower.buf_size();
-
- /** In case of exception for the object to remain in the correct state,
- * changing the variable `grower` (which determines the buffer size
of the hash table)
- * is postponed for a moment after a real buffer change.
- * The temporary variable `new_grower` is used to determine the new
size.
- */
- Grower new_grower = grower;
- if (for_num_elems) {
- new_grower.set(for_num_elems);
- if (new_grower.buf_size() <= old_size) return;
- } else if (for_buf_size) {
- new_grower.set_buf_size(for_buf_size);
- if (new_grower.buf_size() <= old_size) return;
- } else
- new_grower.increase_size();
-
- /// Expand the space.
- buf = reinterpret_cast<Cell*>(Allocator::realloc(buf,
get_buffer_size_in_bytes(),
- new_grower.buf_size()
* sizeof(Cell)));
- grower = new_grower;
-
- /** Now some items may need to be moved to a new location.
- * The element can stay in place, or move to a new location "on the
right",
- * or move to the left of the collision resolution chain, because
the elements to the left of it have been moved to the new "right" location.
- */
- size_t i = 0;
- for (; i < old_size; ++i)
- if (!buf[i].is_zero(*this) && !buf[i].is_deleted())
- reinsert(buf[i], buf[i].get_hash(*this));
-
- /** There is also a special case:
- * if the element was to be at the end of the old buffer,
[ x]
- * but is at the beginning because of the collision resolution
chain, [o x]
- * then after resizing, it will first be out of place again,
[ xo ]
- * and in order to transfer it where necessary,
- * after transferring all the elements from the old halves you
need to [ o x ]
- * process tail from the collision resolution chain immediately
after it [ o x ]
- */
- for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i)
- reinsert(buf[i], buf[i].get_hash(*this));
-
-#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
- watch.stop();
- std::cerr << std::fixed << std::setprecision(3) << "Resize from " <<
old_size << " to "
- << grower.buf_size() << " took " << watch.elapsedSeconds()
<< " sec."
- << std::endl;
-#endif
- }
-
/** Paste into the new buffer the value that was in the old buffer.
* Used when increasing the buffer size.
*/
@@ -684,6 +644,8 @@ public:
std::swap(buf, rhs.buf);
std::swap(m_size, rhs.m_size);
std::swap(grower, rhs.grower);
+ std::swap(_need_partition, rhs._need_partition);
+ std::swap(_partitioned_threshold, rhs._partitioned_threshold);
Hash::operator=(std::move(rhs));
Allocator::operator=(std::move(rhs));
@@ -816,10 +778,12 @@ protected:
throw;
}
- // The hash table was rehashed, so we have to re-find the key.
- size_t new_place = find_cell(key, hash_value,
grower.place(hash_value));
- assert(!buf[new_place].is_zero(*this));
- it = &buf[new_place];
+ if (LIKELY(!_need_partition)) {
+ // The hash table was rehashed, so we have to re-find the key.
+ size_t new_place = find_cell(key, hash_value,
grower.place(hash_value));
+ assert(!buf[new_place].is_zero(*this));
+ it = &buf[new_place];
+ }
}
}
@@ -853,10 +817,12 @@ protected:
throw;
}
- // The hash table was rehashed, so we have to re-find the key.
- size_t new_place = find_cell(key, hash_value,
grower.place(hash_value));
- assert(!buf[new_place].is_zero(*this));
- it = &buf[new_place];
+ if (LIKELY(!_need_partition)) {
+ // The hash table was rehashed, so we have to re-find the key.
+ size_t new_place = find_cell(key, hash_value,
grower.place(hash_value));
+ assert(!buf[new_place].is_zero(*this));
+ it = &buf[new_place];
+ }
}
}
@@ -1076,7 +1042,9 @@ public:
float get_factor() const { return MAX_BUCKET_OCCUPANCY_FRACTION; }
- bool should_be_shrink(int64_t valid_row) { return valid_row < get_factor()
* (size() / 2.0); }
+ bool should_be_shrink(int64_t valid_row) const {
+ return valid_row < get_factor() * (size() / 2.0);
+ }
void init_buf_size(size_t reserve_for_num_elements) {
free();
@@ -1118,4 +1086,69 @@ public:
#ifdef DBMS_HASH_MAP_COUNT_COLLISIONS
size_t getCollisions() const { return collisions; }
#endif
+
+private:
+ /// Increase the size of the buffer.
+ void resize(size_t for_num_elems = 0, size_t for_buf_size = 0) {
+ SCOPED_RAW_TIMER(&_resize_timer_ns);
+#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
+ Stopwatch watch;
+#endif
+
+ size_t old_size = grower.buf_size();
+
+ /** In case of exception for the object to remain in the correct state,
+ * changing the variable `grower` (which determines the buffer size
of the hash table)
+ * is postponed for a moment after a real buffer change.
+ * The temporary variable `new_grower` is used to determine the new
size.
+ */
+ Grower new_grower = grower;
+ if (for_num_elems) {
+ new_grower.set(for_num_elems);
+ if (new_grower.buf_size() <= old_size) return;
+ } else if (for_buf_size) {
+ new_grower.set_buf_size(for_buf_size);
+ if (new_grower.buf_size() <= old_size) return;
+ } else
+ new_grower.increase_size();
+
+ // new bucket count exceed partitioned hash table bucket count
threshold,
+ // don't resize and set need partition flag
+ if (check_if_need_partition(new_grower.buf_size())) {
+ _need_partition = true;
+ return;
+ }
+
+ /// Expand the space.
+ buf = reinterpret_cast<Cell*>(Allocator::realloc(buf,
get_buffer_size_in_bytes(),
+ new_grower.buf_size()
* sizeof(Cell)));
+ grower = new_grower;
+
+ /** Now some items may need to be moved to a new location.
+ * The element can stay in place, or move to a new location "on the
right",
+ * or move to the left of the collision resolution chain, because
the elements to the left of it have been moved to the new "right" location.
+ */
+ size_t i = 0;
+ for (; i < old_size; ++i)
+ if (!buf[i].is_zero(*this) && !buf[i].is_deleted())
+ reinsert(buf[i], buf[i].get_hash(*this));
+
+ /** There is also a special case:
+ * if the element was to be at the end of the old buffer,
[ x]
+ * but is at the beginning because of the collision resolution
chain, [o x]
+ * then after resizing, it will first be out of place again,
[ xo ]
+ * and in order to transfer it where necessary,
+ * after transferring all the elements from the old halves you
need to [ o x ]
+ * process tail from the collision resolution chain immediately
after it [ o x ]
+ */
+ for (; !buf[i].is_zero(*this) && !buf[i].is_deleted(); ++i)
+ reinsert(buf[i], buf[i].get_hash(*this));
+
+#ifdef DBMS_HASH_MAP_DEBUG_RESIZES
+ watch.stop();
+ std::cerr << std::fixed << std::setprecision(3) << "Resize from " <<
old_size << " to "
+ << grower.buf_size() << " took " << watch.elapsedSeconds()
<< " sec."
+ << std::endl;
+#endif
+ }
};
diff --git a/be/src/vec/common/hash_table/partitioned_hash_map.h
b/be/src/vec/common/hash_table/partitioned_hash_map.h
new file mode 100644
index 0000000000..fabf61b27a
--- /dev/null
+++ b/be/src/vec/common/hash_table/partitioned_hash_map.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/TwoLevelHashMap.h
+// and modified by Doris
+#pragma once
+
+#include "vec/common/hash_table/hash_map.h"
+#include "vec/common/hash_table/partitioned_hash_table.h"
+
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+ typename Grower = PartitionedHashTableGrower<>, typename Allocator =
HashTableAllocator,
+ template <typename...> typename ImplTable = HashMapTable>
+class PartitionedHashMapTable
+ : public PartitionedHashTable<Key, Cell, Hash, Grower, Allocator,
+ ImplTable<Key, Cell, Hash, Grower,
Allocator>> {
+public:
+ using Impl = ImplTable<Key, Cell, Hash, Grower, Allocator>;
+ using Base = PartitionedHashTable<Key, Cell, Hash, Grower, Allocator,
+ ImplTable<Key, Cell, Hash, Grower,
Allocator>>;
+ using LookupResult = typename Impl::LookupResult;
+
+ using Base::Base;
+ using Base::prefetch;
+
+ using mapped_type = typename Cell::Mapped;
+
+ typename Cell::Mapped& ALWAYS_INLINE operator[](const Key& x) {
+ LookupResult it;
+ bool inserted;
+ this->emplace(x, it, inserted);
+
+ if (inserted) new (lookup_result_get_mapped(it)) mapped_type();
+
+ return *lookup_result_get_mapped(it);
+ }
+};
+
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
+ typename Grower = PartitionedHashTableGrower<>, typename Allocator =
HashTableAllocator,
+ template <typename...> typename ImplTable = HashMapTable>
+using PartitionedHashMap = PartitionedHashMapTable<Key, HashMapCell<Key,
Mapped, Hash>, Hash,
+ Grower, Allocator,
ImplTable>;
+
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
+ typename Grower = PartitionedHashTableGrower<>, typename Allocator =
HashTableAllocator,
+ template <typename...> typename ImplTable = HashMapTable>
+using PartitionedHashMapWithSavedHash =
+ PartitionedHashMapTable<Key, HashMapCellWithSavedHash<Key, Mapped,
Hash>, Hash, Grower,
+ Allocator, ImplTable>;
diff --git a/be/src/vec/common/hash_table/partitioned_hash_table.h
b/be/src/vec/common/hash_table/partitioned_hash_table.h
new file mode 100644
index 0000000000..d0cdc25f89
--- /dev/null
+++ b/be/src/vec/common/hash_table/partitioned_hash_table.h
@@ -0,0 +1,551 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/HashTable/TwoLevelHashTable.h
+// and modified by Doris
+#pragma once
+
+#include "vec/common/hash_table/hash_table.h"
+
+/** Partitioned hash table.
+ * Represents 16 (or 1ULL << BITS_FOR_SUB_TABLE) small hash tables (sub table
count of the first level).
+ * To determine which one to use, one of the bytes of the hash function is
taken.
+ *
+ * Usually works a little slower than a simple hash table.
+ * However, it has advantages in some cases:
+ * - if you need to merge two hash tables together, then you can easily
parallelize it by sub tables;
+ * - delay during resizes is amortized, since the small hash tables will be
resized separately;
+ * - in theory, resizes are cache-local in a larger range of sizes.
+ */
+
+template <size_t initial_size_degree = 8>
+struct PartitionedHashTableGrower : public
HashTableGrowerWithPrecalculation<initial_size_degree> {
+ /// Increase the size of the hash table.
+ void increase_size() { this->increase_size_degree(this->size_degree() >=
15 ? 1 : 2); }
+};
+
+template <typename Key, typename Cell, typename Hash, typename Grower,
typename Allocator,
+ typename ImplTable = HashTable<Key, Cell, Hash, Grower, Allocator>,
+ size_t BITS_FOR_SUB_TABLE = 4>
+class PartitionedHashTable : private boost::noncopyable,
+ protected Hash /// empty base optimization
+{
+public:
+ using Impl = ImplTable;
+
+ using key_type = typename Impl::key_type;
+ using mapped_type = typename Impl::mapped_type;
+ using value_type = typename Impl::value_type;
+ using cell_type = typename Impl::cell_type;
+
+ using LookupResult = typename Impl::LookupResult;
+ using ConstLookupResult = typename Impl::ConstLookupResult;
+
+protected:
+ friend class const_iterator;
+ friend class iterator;
+
+ using HashValue = size_t;
+ using Self = PartitionedHashTable;
+
+private:
+ static constexpr size_t NUM_LEVEL1_SUB_TABLES = 1ULL << BITS_FOR_SUB_TABLE;
+ static constexpr size_t MAX_SUB_TABLE = NUM_LEVEL1_SUB_TABLES - 1;
+
+ //factor that will trigger growing the hash table on insert.
+ static constexpr float MAX_SUB_TABLE_OCCUPANCY_FRACTION = 0.5f;
+
+ Impl level0_sub_table;
+ Impl level1_sub_tables[NUM_LEVEL1_SUB_TABLES];
+
+ bool _is_partitioned = false;
+
+ int64_t _convert_timer_ns = 0;
+
+public:
+ PartitionedHashTable() = default;
+
+ PartitionedHashTable(PartitionedHashTable&& rhs) { *this = std::move(rhs);
}
+
+ PartitionedHashTable& operator=(PartitionedHashTable&& rhs) {
+ std::swap(_is_partitioned, rhs._is_partitioned);
+
+ level0_sub_table = std::move(rhs.level0_sub_table);
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+ level1_sub_tables[i] = std::move(rhs.level1_sub_tables[i]);
+ }
+
+ Hash::operator=(std::move(rhs));
+ return *this;
+ }
+
+ size_t hash(const Key& x) const { return Hash::operator()(x); }
+
+ float get_factor() const { return MAX_SUB_TABLE_OCCUPANCY_FRACTION; }
+
+ int64_t get_convert_timer_value() const { return _convert_timer_ns; }
+
+ bool should_be_shrink(int64_t valid_row) const {
+ if (_is_partitioned) {
+ return false;
+ } else {
+ return level0_sub_table.should_be_shrink(valid_row);
+ }
+ }
+
+ template <typename Func>
+ void ALWAYS_INLINE for_each_value(Func&& func) {
+ if (_is_partitioned) {
+ for (auto i = 0u; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+ level1_sub_tables[i].for_each_value(func);
+ }
+ } else {
+ level0_sub_table.for_each_value(func);
+ }
+ }
+
+ size_t get_size() {
+ size_t count = 0;
+ if (_is_partitioned) {
+ for (auto i = 0u; i < this->NUM_LEVEL1_SUB_TABLES; ++i) {
+ for (auto& v : this->level1_sub_tables[i]) {
+ count += v.get_second().get_row_count();
+ }
+ }
+ } else {
+ count = level0_sub_table.get_size();
+ }
+ return count;
+ }
+
+ void init_buf_size(size_t reserve_for_num_elements) {
+ if (_is_partitioned) {
+ for (auto& impl : level1_sub_tables) {
+ impl.init_buf_size(reserve_for_num_elements /
NUM_LEVEL1_SUB_TABLES);
+ }
+ } else {
+ if
(level0_sub_table.check_if_need_partition(reserve_for_num_elements)) {
+ level0_sub_table.clear_and_shrink();
+ _is_partitioned = true;
+
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+
level1_sub_tables[i].init_buf_size(reserve_for_num_elements /
+ NUM_LEVEL1_SUB_TABLES);
+ }
+ } else {
+ level0_sub_table.init_buf_size(reserve_for_num_elements);
+ }
+ }
+ }
+
+ void delete_zero_key(Key key) {
+ if (_is_partitioned) {
+ const auto key_hash = hash(key);
+ size_t sub_table_idx = get_sub_table_from_hash(key_hash);
+ level1_sub_tables[sub_table_idx].delete_zero_key(key);
+ } else {
+ level0_sub_table.delete_zero_key(key);
+ }
+ }
+
+ size_t get_buffer_size_in_bytes() const {
+ if (_is_partitioned) {
+ size_t buff_size = 0;
+ for (const auto& impl : level1_sub_tables) buff_size +=
impl.get_buffer_size_in_bytes();
+ return buff_size;
+ } else {
+ return level0_sub_table.get_buffer_size_in_bytes();
+ }
+ }
+
+ size_t get_buffer_size_in_cells() const {
+ if (_is_partitioned) {
+ size_t buff_size = 0;
+ for (const auto& impl : level1_sub_tables) buff_size +=
impl.get_buffer_size_in_cells();
+ return buff_size;
+ } else {
+ return level0_sub_table.get_buffer_size_in_cells();
+ }
+ }
+
+ std::vector<size_t> get_buffer_sizes_in_cells() const {
+ std::vector<size_t> sizes;
+ if (_is_partitioned) {
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+
sizes.push_back(level1_sub_tables[i].get_buffer_size_in_cells());
+ }
+ } else {
+ sizes.push_back(level0_sub_table.get_buffer_size_in_cells());
+ }
+ return sizes;
+ }
+
+ void reset_resize_timer() {
+ if (_is_partitioned) {
+ for (auto& impl : level1_sub_tables) {
+ impl.reset_resize_timer();
+ }
+ } else {
+ level0_sub_table.reset_resize_timer();
+ }
+ }
+ int64_t get_resize_timer_value() const {
+ if (_is_partitioned) {
+ int64_t resize_timer_ns = 0;
+ for (const auto& impl : level1_sub_tables) {
+ resize_timer_ns += impl.get_resize_timer_value();
+ }
+ return resize_timer_ns;
+ } else {
+ return level0_sub_table.get_resize_timer_value();
+ }
+ }
+
+protected:
+ typename Impl::iterator begin_of_next_non_empty_sub_table_idx(size_t&
sub_table_idx) {
+ while (sub_table_idx != NUM_LEVEL1_SUB_TABLES &&
level1_sub_tables[sub_table_idx].empty())
+ ++sub_table_idx;
+
+ if (sub_table_idx != NUM_LEVEL1_SUB_TABLES) return
level1_sub_tables[sub_table_idx].begin();
+
+ --sub_table_idx;
+ return level1_sub_tables[MAX_SUB_TABLE].end();
+ }
+
+ typename Impl::const_iterator begin_of_next_non_empty_sub_table_idx(
+ size_t& sub_table_idx) const {
+ while (sub_table_idx != NUM_LEVEL1_SUB_TABLES &&
level1_sub_tables[sub_table_idx].empty())
+ ++sub_table_idx;
+
+ if (sub_table_idx != NUM_LEVEL1_SUB_TABLES) return
level1_sub_tables[sub_table_idx].begin();
+
+ --sub_table_idx;
+ return level1_sub_tables[MAX_SUB_TABLE].end();
+ }
+
+public:
+ void set_partitioned_threshold(int threshold) {
+ level0_sub_table.set_partitioned_threshold(threshold);
+ }
+
+ class iterator /// NOLINT
+ {
+ Self* container {};
+ size_t sub_table_idx {};
+ typename Impl::iterator current_it {};
+
+ friend class PartitionedHashTable;
+
+ iterator(Self* container_, size_t sub_table_idx_, typename
Impl::iterator current_it_)
+ : container(container_), sub_table_idx(sub_table_idx_),
current_it(current_it_) {}
+
+ public:
+ iterator() = default;
+
+ bool operator==(const iterator& rhs) const {
+ return sub_table_idx == rhs.sub_table_idx && current_it ==
rhs.current_it;
+ }
+ bool operator!=(const iterator& rhs) const { return !(*this == rhs); }
+
+ iterator& operator++() {
+ ++current_it;
+ if (container->_is_partitioned) {
+ if (current_it ==
container->level1_sub_tables[sub_table_idx].end()) {
+ ++sub_table_idx;
+ current_it =
container->begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+ }
+ }
+
+ return *this;
+ }
+
+ Cell& operator*() const { return *current_it; }
+ Cell* operator->() const { return current_it.get_ptr(); }
+
+ Cell* get_ptr() const { return current_it.get_ptr(); }
+ size_t get_hash() const { return current_it.get_hash(); }
+ };
+
+ class const_iterator /// NOLINT
+ {
+ Self* container {};
+ size_t sub_table_idx {};
+ typename Impl::const_iterator current_it {};
+
+ friend class PartitionedHashTable;
+
+ const_iterator(Self* container_, size_t sub_table_idx_,
+ typename Impl::const_iterator current_it_)
+ : container(container_), sub_table_idx(sub_table_idx_),
current_it(current_it_) {}
+
+ public:
+ const_iterator() = default;
+ const_iterator(const iterator& rhs)
+ : container(rhs.container),
+ sub_table_idx(rhs.sub_table_idx),
+ current_it(rhs.current_it) {} /// NOLINT
+
+ bool operator==(const const_iterator& rhs) const {
+ return sub_table_idx == rhs.sub_table_idx && current_it ==
rhs.current_it;
+ }
+ bool operator!=(const const_iterator& rhs) const { return !(*this ==
rhs); }
+
+ const_iterator& operator++() {
+ ++current_it;
+ if (container->_is_partitioned) {
+ if (current_it ==
container->level1_sub_tables[sub_table_idx].end()) {
+ ++sub_table_idx;
+ current_it =
container->begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+ }
+ }
+
+ return *this;
+ }
+
+ const Cell& operator*() const { return *current_it; }
+ const Cell* operator->() const { return current_it->get_ptr(); }
+
+ const Cell* get_ptr() const { return current_it.get_ptr(); }
+ size_t get_hash() const { return current_it.get_hash(); }
+ };
+
+ const_iterator begin() const {
+ if (_is_partitioned) {
+ size_t sub_table_idx = 0;
+ typename Impl::const_iterator impl_it =
+ begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+ return {this, sub_table_idx, impl_it};
+ } else {
+ return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.begin()};
+ }
+ }
+
+ iterator begin() {
+ if (_is_partitioned) {
+ size_t sub_table_idx = 0;
+ typename Impl::iterator impl_it =
begin_of_next_non_empty_sub_table_idx(sub_table_idx);
+ return {this, sub_table_idx, impl_it};
+ } else {
+ return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.begin()};
+ }
+ }
+
+ const_iterator end() const {
+ if (_is_partitioned) {
+ return {this, MAX_SUB_TABLE,
level1_sub_tables[MAX_SUB_TABLE].end()};
+ } else {
+ return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.end()};
+ }
+ }
+ iterator end() {
+ if (_is_partitioned) {
+ return {this, MAX_SUB_TABLE,
level1_sub_tables[MAX_SUB_TABLE].end()};
+ } else {
+ return {this, NUM_LEVEL1_SUB_TABLES, level0_sub_table.end()};
+ }
+ }
+
+ /// Insert a value. In the case of any more complex values, it is better
to use the `emplace` function.
+ std::pair<LookupResult, bool> ALWAYS_INLINE insert(const value_type& x) {
+ size_t hash_value = hash(Cell::get_key(x));
+
+ std::pair<LookupResult, bool> res;
+ emplace(Cell::get_key(x), res.first, res.second, hash_value);
+
+ if (res.second) insert_set_mapped(lookup_result_get_mapped(res.first),
x);
+
+ return res;
+ }
+
+ void expanse_for_add_elem(size_t num_elem) {
+ if (_is_partitioned) {
+ size_t num_elem_per_sub_table =
+ (num_elem + NUM_LEVEL1_SUB_TABLES - 1) /
NUM_LEVEL1_SUB_TABLES;
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+
level1_sub_tables[i].expanse_for_add_elem(num_elem_per_sub_table);
+ }
+ } else {
+ level0_sub_table.expanse_for_add_elem(num_elem);
+ if (UNLIKELY(level0_sub_table.need_partition())) {
+ convert_to_partitioned();
+ }
+ }
+ }
+
+ template <typename KeyHolder>
+ void ALWAYS_INLINE prefetch(KeyHolder& key_holder) {
+ if (_is_partitioned) {
+ const auto& key = key_holder_get_key(key_holder);
+ const auto key_hash = hash(key);
+ const auto sub_table_idx = get_sub_table_from_hash(key_hash);
+ level1_sub_tables[sub_table_idx].prefetch(key_holder);
+ } else {
+ level0_sub_table.prefetch(key_holder);
+ }
+ }
+
+ template <bool READ>
+ void ALWAYS_INLINE prefetch_by_hash(size_t hash_value) {
+ if (_is_partitioned) {
+ const auto sub_table_idx = get_sub_table_from_hash(hash_value);
+ level1_sub_tables[sub_table_idx].template
prefetch_by_hash<READ>(hash_value);
+ } else {
+ level0_sub_table.template prefetch_by_hash<READ>(hash_value);
+ }
+ }
+
+ template <bool READ, typename KeyHolder>
+ void ALWAYS_INLINE prefetch(KeyHolder& key_holder) {
+ if (_is_partitioned) {
+ const auto& key = key_holder_get_key(key_holder);
+ const auto key_hash = hash(key);
+ const auto sub_table_idx = get_sub_table_from_hash(key_hash);
+ level1_sub_tables[sub_table_idx].template
prefetch<READ>(key_holder);
+ } else {
+ level0_sub_table.template prefetch<READ>(key_holder);
+ }
+ }
+
+ /** Insert the key,
+ * return an iterator to a position that can be used for `placement new`
of value,
+ * as well as the flag - whether a new key was inserted.
+ *
+ * You have to make `placement new` values if you inserted a new key,
+ * since when destroying a hash table, the destructor will be invoked for
it!
+ *
+ * Example usage:
+ *
+ * Map::iterator it;
+ * bool inserted;
+ * map.emplace(key, it, inserted);
+ * if (inserted)
+ * new(&it->second) Mapped(value);
+ */
+ template <typename KeyHolder>
+ void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool&
inserted) {
+ size_t hash_value = hash(key_holder_get_key(key_holder));
+ emplace(key_holder, it, inserted, hash_value);
+ }
+
+ /// Same, but with a precalculated values of hash function.
+ template <typename KeyHolder>
+ void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it, bool&
inserted,
+ size_t hash_value) {
+ if (_is_partitioned) {
+ size_t sub_table_idx = get_sub_table_from_hash(hash_value);
+ level1_sub_tables[sub_table_idx].emplace(key_holder, it, inserted,
hash_value);
+ } else {
+ level0_sub_table.emplace(key_holder, it, inserted, hash_value);
+ if (UNLIKELY(level0_sub_table.need_partition())) {
+ convert_to_partitioned();
+
+ // The hash table was converted to partitioned, so we have to
re-find the key.
+ size_t sub_table_id = get_sub_table_from_hash(hash_value);
+ it =
level1_sub_tables[sub_table_id].find(key_holder_get_key(key_holder),
+ hash_value);
+ }
+ }
+ }
+
+ template <typename KeyHolder>
+ void ALWAYS_INLINE emplace(KeyHolder&& key_holder, LookupResult& it,
size_t hash_value,
+ bool& inserted) {
+ emplace(key_holder, it, inserted, hash_value);
+ }
+
+ LookupResult ALWAYS_INLINE find(Key x, size_t hash_value) {
+ if (_is_partitioned) {
+ size_t sub_table_idx = get_sub_table_from_hash(hash_value);
+ return level1_sub_tables[sub_table_idx].find(x, hash_value);
+ } else {
+ return level0_sub_table.find(x, hash_value);
+ }
+ }
+
+ ConstLookupResult ALWAYS_INLINE find(Key x, size_t hash_value) const {
+ return const_cast<std::decay_t<decltype(*this)>*>(this)->find(x,
hash_value);
+ }
+
+ LookupResult ALWAYS_INLINE find(Key x) { return find(x, hash(x)); }
+
+ ConstLookupResult ALWAYS_INLINE find(Key x) const { return find(x,
hash(x)); }
+
+ size_t size() const {
+ if (_is_partitioned) {
+ size_t res = 0;
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) res +=
level1_sub_tables[i].size();
+ return res;
+ } else {
+ return level0_sub_table.size();
+ }
+ }
+
+ std::vector<size_t> sizes() const {
+ std::vector<size_t> sizes;
+ if (_is_partitioned) {
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+ sizes.push_back(level1_sub_tables[i].size());
+ }
+ } else {
+ sizes.push_back(level0_sub_table.size());
+ }
+ return sizes;
+ }
+
+ bool empty() const {
+ if (_is_partitioned) {
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i)
+ if (!level1_sub_tables[i].empty()) return false;
+ return true;
+ } else {
+ return level0_sub_table.empty();
+ }
+ }
+
+private:
+ void convert_to_partitioned() {
+ SCOPED_RAW_TIMER(&_convert_timer_ns);
+
+ auto bucket_count = level0_sub_table.get_buffer_size_in_cells();
+ for (size_t i = 0; i < NUM_LEVEL1_SUB_TABLES; ++i) {
+ level1_sub_tables[i] = std::move(Impl(bucket_count /
NUM_LEVEL1_SUB_TABLES));
+ }
+
+ auto it = level0_sub_table.begin();
+
+ /// It is assumed that the zero key (stored separately) is first in
iteration order.
+ if (it != level0_sub_table.end() &&
it.get_ptr()->is_zero(level0_sub_table)) {
+ insert(it->get_value());
+ ++it;
+ }
+
+ for (; it != level0_sub_table.end(); ++it) {
+ const Cell* cell = it.get_ptr();
+ size_t hash_value = cell->get_hash(level0_sub_table);
+ size_t sub_table_idx = get_sub_table_from_hash(hash_value);
+ level1_sub_tables[sub_table_idx].insert_unique_non_zero(cell,
hash_value);
+ }
+
+ _is_partitioned = true;
+ level0_sub_table.clear_and_shrink();
+ }
+
+ /// NOTE Bad for hash tables with more than 2^32 cells.
+ static size_t get_sub_table_from_hash(size_t hash_value) {
+ return (hash_value >> (32 - BITS_FOR_SUB_TABLE)) & MAX_SUB_TABLE;
+ }
+};
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 79beb01a83..da144378b1 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -73,20 +73,37 @@ struct ProcessHashTableBuild {
Defer defer {[&]() {
int64_t bucket_size =
hash_table_ctx.hash_table.get_buffer_size_in_cells();
+ int64_t filled_bucket_size = hash_table_ctx.hash_table.size();
int64_t bucket_bytes =
hash_table_ctx.hash_table.get_buffer_size_in_bytes();
_join_node->_mem_used += bucket_bytes - old_bucket_bytes;
COUNTER_SET(_join_node->_build_buckets_counter, bucket_size);
+ COUNTER_SET(_join_node->_build_buckets_fill_counter,
filled_bucket_size);
+
+ auto hash_table_buckets =
hash_table_ctx.hash_table.get_buffer_sizes_in_cells();
+ std::string hash_table_buckets_info;
+ for (auto bucket_count : hash_table_buckets) {
+ hash_table_buckets_info += std::to_string(bucket_count) + ", ";
+ }
+ _join_node->add_hash_buckets_info(hash_table_buckets_info);
+
+ auto hash_table_sizes = hash_table_ctx.hash_table.sizes();
+ hash_table_buckets_info.clear();
+ for (auto table_size : hash_table_sizes) {
+ hash_table_buckets_info += std::to_string(table_size) + ", ";
+ }
+ _join_node->add_hash_buckets_filled_info(hash_table_buckets_info);
}};
KeyGetter key_getter(_build_raw_ptrs, _join_node->_build_key_sz,
nullptr);
SCOPED_TIMER(_join_node->_build_table_insert_timer);
+ hash_table_ctx.hash_table.reset_resize_timer();
+
// only not build_unique, we need expanse hash table before insert data
if (!_join_node->_build_unique) {
// _rows contains null row, which will cause hash table resize to
be large.
RETURN_IF_CATCH_BAD_ALLOC(hash_table_ctx.hash_table.expanse_for_add_elem(_rows));
}
- hash_table_ctx.hash_table.reset_resize_timer();
vector<int>& inserted_rows =
_join_node->_inserted_rows[&_acquired_block];
bool has_runtime_filter = !_join_node->_runtime_filter_descs.empty();
@@ -172,6 +189,9 @@ struct ProcessHashTableBuild {
COUNTER_UPDATE(_join_node->_build_table_expanse_timer,
hash_table_ctx.hash_table.get_resize_timer_value());
+ COUNTER_UPDATE(_join_node->_build_table_convert_timer,
+ hash_table_ctx.hash_table.get_convert_timer_value());
+
return Status::OK();
}
@@ -337,6 +357,8 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_build_table_insert_timer = ADD_TIMER(build_phase_profile,
"BuildTableInsertTime");
_build_expr_call_timer = ADD_TIMER(build_phase_profile,
"BuildExprCallTime");
_build_table_expanse_timer = ADD_TIMER(build_phase_profile,
"BuildTableExpanseTime");
+ _build_table_convert_timer =
+ ADD_TIMER(build_phase_profile,
"BuildTableConvertToPartitionedTime");
_build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows",
TUnit::UNIT);
_build_side_compute_hash_timer = ADD_TIMER(build_phase_profile,
"BuildSideHashComputingTime");
@@ -355,6 +377,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
_push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
_build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets",
TUnit::UNIT);
+ _build_buckets_fill_counter = ADD_COUNTER(runtime_profile(),
"FilledBuckets", TUnit::UNIT);
if (_is_broadcast_join) {
runtime_profile()->add_info_string("BroadcastJoin", "true");
@@ -374,13 +397,21 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_left_table_data_types =
VectorizedUtils::get_data_types(child(0)->row_desc());
// Hash Table Init
- _hash_table_init();
+ _hash_table_init(state);
_process_hashtable_ctx_variants_init(state);
_construct_mutable_join_block();
return Status::OK();
}
+void HashJoinNode::add_hash_buckets_info(const std::string& info) {
+ runtime_profile()->add_info_string("HashTableBuckets", info);
+}
+
+void HashJoinNode::add_hash_buckets_filled_info(const std::string& info) {
+ runtime_profile()->add_info_string("HashTableFilledBuckets", info);
+}
+
Status HashJoinNode::close(RuntimeState* state) {
if (is_closed()) {
return Status::OK();
@@ -833,7 +864,7 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block, uin
return st;
}
-void HashJoinNode::_hash_table_init() {
+void HashJoinNode::_hash_table_init(RuntimeState* state) {
std::visit(
[&](auto&& join_op_variants, auto have_other_join_conjunct) {
using JoinOpType = std::decay_t<decltype(join_op_variants)>;
@@ -956,6 +987,16 @@ void HashJoinNode::_hash_table_init() {
_join_op_variants, make_bool_variant(_have_other_join_conjunct));
DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
+
+ std::visit(Overload {[&](std::monostate& arg) {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ __builtin_unreachable();
+ },
+ [&](auto&& arg) {
+ arg.hash_table_ptr->set_partitioned_threshold(
+
state->partitioned_hash_join_rows_threshold());
+ }},
+ *_hash_table_variants);
}
void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 691857c986..72d8317468 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -24,7 +24,7 @@
#include "join_op.h"
#include "process_hash_table_probe.h"
#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map.h"
+#include "vec/common/hash_table/partitioned_hash_map.h"
#include "vjoin_node_base.h"
namespace doris {
@@ -39,7 +39,7 @@ class SharedHashTableController;
template <typename RowRefListType>
struct SerializedHashTableContext {
using Mapped = RowRefListType;
- using HashTable = HashMap<StringRef, Mapped>;
+ using HashTable = PartitionedHashMap<StringRef, Mapped>;
using State = ColumnsHashing::HashMethodSerialized<typename
HashTable::value_type, Mapped>;
using Iter = typename HashTable::iterator;
@@ -70,7 +70,7 @@ struct
IsSerializedHashTableContextTraits<ColumnsHashing::HashMethodSerialized<V
template <class T, typename RowRefListType>
struct PrimaryTypeHashTableContext {
using Mapped = RowRefListType;
- using HashTable = HashMap<T, Mapped, HashCRC32<T>>;
+ using HashTable = PartitionedHashMap<T, Mapped, HashCRC32<T>>;
using State =
ColumnsHashing::HashMethodOneNumber<typename
HashTable::value_type, Mapped, T, false>;
using Iter = typename HashTable::iterator;
@@ -105,7 +105,7 @@ using I256HashTableContext =
PrimaryTypeHashTableContext<UInt256, RowRefListType
template <class T, bool has_null, typename RowRefListType>
struct FixedKeyHashTableContext {
using Mapped = RowRefListType;
- using HashTable = HashMap<T, Mapped, HashCRC32<T>>;
+ using HashTable = PartitionedHashMap<T, Mapped, HashCRC32<T>>;
using State = ColumnsHashing::HashMethodKeysFixed<typename
HashTable::value_type, T, Mapped,
has_null, false>;
using Iter = typename HashTable::iterator;
@@ -192,6 +192,8 @@ public:
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
Status close(RuntimeState* state) override;
+ void add_hash_buckets_info(const std::string& info);
+ void add_hash_buckets_filled_info(const std::string& info);
private:
using VExprContexts = std::vector<VExprContext*>;
@@ -218,9 +220,11 @@ private:
RuntimeProfile::Counter* _build_expr_call_timer;
RuntimeProfile::Counter* _build_table_insert_timer;
RuntimeProfile::Counter* _build_table_expanse_timer;
+ RuntimeProfile::Counter* _build_table_convert_timer;
RuntimeProfile::Counter* _probe_expr_call_timer;
RuntimeProfile::Counter* _probe_next_timer;
RuntimeProfile::Counter* _build_buckets_counter;
+ RuntimeProfile::Counter* _build_buckets_fill_counter;
RuntimeProfile::Counter* _push_down_timer;
RuntimeProfile::Counter* _push_compute_timer;
RuntimeProfile::Counter* _search_hashtable_timer;
@@ -281,7 +285,7 @@ private:
void _set_build_ignore_flag(Block& block, const std::vector<int>&
res_col_ids);
- void _hash_table_init();
+ void _hash_table_init(RuntimeState* state);
void _process_hashtable_ctx_variants_init(RuntimeState* state);
static constexpr auto _MAX_BUILD_BLOCK_COUNT = 128;
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 8ca3656ad4..7d92dabedf 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -121,10 +121,10 @@ Status
SharedHashTableController::release_ref_count_if_need(TUniqueId fragment_i
Status SharedHashTableController::wait_for_closable(RuntimeState* state, int
my_node_id) {
std::unique_lock<std::mutex> lock(_mutex);
- RETURN_IF_CANCELLED(state);
if (!_ref_fragments[my_node_id].empty()) {
_cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); });
}
+ RETURN_IF_CANCELLED(state);
return Status::OK();
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 59b653a88f..c8622de800 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -229,6 +229,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String INTERNAL_SESSION = "internal_session";
+ public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD =
"partitioned_hash_join_rows_threshold";
+
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field,
String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -601,6 +603,10 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = INTERNAL_SESSION)
public boolean internalSession = false;
+ // Use partitioned hash join if build side row count >= the threshold . 0
- the threshold is not set.
+ @VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD)
+ public int partitionedHashJoinRowsThreshold = 0;
+
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
@@ -609,6 +615,7 @@ public class SessionVariable implements Serializable,
Writable {
this.enableLocalExchange = random.nextBoolean();
this.disableJoinReorder = random.nextBoolean();
this.disableStreamPreaggregations = random.nextBoolean();
+ // this.partitionedHashJoinRowsThreshold = random.nextBoolean() ? 8 :
1048576;
}
public String getBlockEncryptionMode() {
@@ -842,6 +849,14 @@ public class SessionVariable implements Serializable,
Writable {
this.enablePartitionCache = enablePartitionCache;
}
+ public int getPartitionedHashJoinRowsThreshold() {
+ return partitionedHashJoinRowsThreshold;
+ }
+
+ public void setPartitionedHashJoinRowsThreshold(int threshold) {
+ this.partitionedHashJoinRowsThreshold = threshold;
+ }
+
// Serialize to thrift object
public boolean getForwardToMaster() {
return forwardToMaster;
@@ -1244,6 +1259,8 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setSkipDeletePredicate(skipDeletePredicate);
+
tResult.setPartitionedHashJoinRowsThreshold(partitionedHashJoinRowsThreshold);
+
return tResult;
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index ded68c7458..b725ea9191 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -179,6 +179,8 @@ struct TQueryOptions {
51: optional bool enable_new_shuffle_hash_method
52: optional i32 be_exec_version = 0
+
+ 53: optional i32 partitioned_hash_join_rows_threshold = 0
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]