This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e413a2b8e9 [Opt](vectorized) Use new way to do hash shffle to speed up
query (#12586)
e413a2b8e9 is described below
commit e413a2b8e914ed13cf1fd6c44e7c6b3cce9c2dd8
Author: HappenLee <[email protected]>
AuthorDate: Thu Sep 15 11:08:04 2022 +0800
[Opt](vectorized) Use new way to do hash shffle to speed up query (#12586)
---
be/src/util/hash_util.hpp | 13 ++++
be/src/vec/columns/column.h | 21 ++++--
be/src/vec/columns/column_array.cpp | 5 --
be/src/vec/columns/column_array.h | 2 -
be/src/vec/columns/column_complex.h | 5 --
be/src/vec/columns/column_const.cpp | 18 ++++-
be/src/vec/columns/column_const.h | 5 +-
be/src/vec/columns/column_decimal.cpp | 12 +++-
be/src/vec/columns/column_decimal.h | 6 +-
be/src/vec/columns/column_dummy.h | 5 --
be/src/vec/columns/column_nullable.cpp | 18 ++++-
be/src/vec/columns/column_nullable.h | 4 +-
be/src/vec/columns/column_string.cpp | 2 +-
be/src/vec/columns/column_string.h | 13 +++-
be/src/vec/columns/column_vector.cpp | 12 +++-
be/src/vec/columns/column_vector.h | 5 +-
be/src/vec/sink/vdata_stream_sender.cpp | 80 ++++++++++++----------
be/src/vec/sink/vdata_stream_sender.h | 16 +++--
.../java/org/apache/doris/qe/SessionVariable.java | 5 ++
gensrc/thrift/PaloInternalService.thrift | 2 +
20 files changed, 173 insertions(+), 76 deletions(-)
diff --git a/be/src/util/hash_util.hpp b/be/src/util/hash_util.hpp
index 0ebcc6e11b..4ee40a06bc 100644
--- a/be/src/util/hash_util.hpp
+++ b/be/src/util/hash_util.hpp
@@ -33,6 +33,7 @@
#elif __aarch64__
#include <sse2neon.h>
#endif
+#include <xxh3.h>
#include <zlib.h>
#include "gen_cpp/Types_types.h"
@@ -363,6 +364,18 @@ public:
std::hash<T> hasher;
seed ^= hasher(v) + 0x9e3779b9 + (seed << 6) + (seed >> 2);
}
+
+ // xxHash function for a byte array. For convenience, a 64-bit seed is
also
+ // hashed into the result. The mapping may change from time to time.
+ static xxh_u64 xxHash64WithSeed(const char* s, size_t len, xxh_u64 seed) {
+ return XXH3_64bits_withSeed(s, len, seed);
+ }
+
+ // same to the up function, just for null value
+ static xxh_u64 xxHash64NullWithSeed(xxh_u64 seed) {
+ static const int INT_VALUE = 0;
+ return XXH3_64bits_withSeed(reinterpret_cast<const char*>(&INT_VALUE),
sizeof(int), seed);
+ }
};
} // namespace doris
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 9334c97476..f58cfa8c0e 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -323,23 +323,36 @@ public:
LOG(FATAL) << "deserialize_vec_with_null_map not supported";
}
+ /// TODO: SipHash is slower than city or xx hash, rethink we should have a
new interface
/// Update state of hash function with value of n-th element.
/// On subsequent calls of this method for sequence of column values of
arbitrary types,
/// passed bytes to hash must identify sequence of values unambiguously.
- virtual void update_hash_with_value(size_t n, SipHash& hash) const = 0;
+ virtual void update_hash_with_value(size_t n, SipHash& hash) const {
+ LOG(FATAL) << "update_hash_with_value siphash not supported";
+ }
+
+ /// Update state of hash function with value of n elements to avoid the
virtual function call
+ /// null_data to mark whether need to do hash compute, null_data == nullptr
+ /// means all element need to do hash function, else only *null_data != 0
need to do hash func
+ /// do xxHash here, faster than other hash method
+ virtual void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data
= nullptr) const {
+ LOG(FATAL) << "update_hashes_with_value siphash not supported";
+ };
/// Update state of hash function with value of n elements to avoid the
virtual function call
/// null_data to mark whether need to do hash compute, null_data == nullptr
/// means all element need to do hash function, else only *null_data != 0
need to do hash func
- virtual void update_hashes_with_value(std::vector<SipHash>& hash,
+ /// do xxHash here, faster than other sip hash
+ virtual void update_hashes_with_value(uint64_t* __restrict hashes,
const uint8_t* __restrict null_data
= nullptr) const {
- LOG(FATAL) << "update_hashes_with_value not supported";
+ LOG(FATAL) << "update_hashes_with_value xxhash not supported";
};
/// Update state of crc32 hash function with value of n elements to avoid
the virtual function call
/// null_data to mark whether need to do hash compute, null_data == nullptr
/// means all element need to do hash function, else only *null_data != 0
need to do hash func
- virtual void update_crcs_with_value(std::vector<uint32_t>& hash,
PrimitiveType type,
+ virtual void update_crcs_with_value(std::vector<uint64_t>& hash,
PrimitiveType type,
const uint8_t* __restrict null_data =
nullptr) const {
LOG(FATAL) << "update_crcs_with_value not supported";
};
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index f9b1c53467..510b1fb681 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -228,11 +228,6 @@ void ColumnArray::update_hash_with_value(size_t n,
SipHash& hash) const {
for (size_t i = 0; i < array_size; ++i)
get_data().update_hash_with_value(offset + i, hash);
}
-void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
- const uint8_t* __restrict
null_data) const {
- SIP_HASHES_FUNCTION_COLUMN_IMPL();
-}
-
void ColumnArray::insert(const Field& x) {
const Array& array = doris::vectorized::get<const Array&>(x);
size_t size = array.size();
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index 2f8df6d83d..4d8e8f4d3e 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -98,8 +98,6 @@ public:
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*&
begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
- void update_hashes_with_value(std::vector<SipHash>& hashes,
- const uint8_t* __restrict null_data) const
override;
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert(const Field& x) override;
void insert_from(const IColumn& src_, size_t n) override;
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index d7e99406e5..2a2c21f19b 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -188,11 +188,6 @@ public:
// TODO add hash function
}
- void update_hashes_with_value(std::vector<SipHash>& hash,
- const uint8_t* __restrict null_data) const
override {
- // TODO add hash function
- }
-
[[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
int nan_direction_hint) const override {
LOG(FATAL) << "compare_at not implemented";
diff --git a/be/src/vec/columns/column_const.cpp
b/be/src/vec/columns/column_const.cpp
index 4bd72ccf8d..dc7142693e 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -104,7 +104,7 @@ void
ColumnConst::update_hashes_with_value(std::vector<SipHash>& hashes,
}
}
-void ColumnConst::update_crcs_with_value(std::vector<uint32_t>& hashes,
doris::PrimitiveType type,
+void ColumnConst::update_crcs_with_value(std::vector<uint64_t>& hashes,
doris::PrimitiveType type,
const uint8_t* __restrict null_data)
const {
DCHECK(null_data == nullptr);
DCHECK(hashes.size() == size());
@@ -121,6 +121,22 @@ void
ColumnConst::update_crcs_with_value(std::vector<uint32_t>& hashes, doris::P
}
}
+void ColumnConst::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict
null_data) const {
+ DCHECK(null_data == nullptr);
+ auto real_data = data->get_data_at(0);
+ auto real_size = size();
+ if (real_data.data == nullptr) {
+ for (int i = 0; i < real_size; ++i) {
+ hashes[i] = HashUtil::xxHash64NullWithSeed(hashes[i]);
+ }
+ } else {
+ for (int i = 0; i < real_size; ++i) {
+ hashes[i] = HashUtil::xxHash64WithSeed(real_data.data,
real_data.size, hashes[i]);
+ }
+ }
+}
+
MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector&
selector) const {
if (s != selector.size()) {
LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of
column ({})",
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 3f4e735780..f001150bee 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -130,9 +130,12 @@ public:
void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const
override;
- void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType
type,
+ void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const
override;
+
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const
override;
ColumnPtr replicate(const Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index 9b6470a371..690714b2ba 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -128,7 +128,7 @@ void
ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
}
template <typename T>
-void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint32_t>& hashes,
PrimitiveType type,
+void ColumnDecimal<T>::update_crcs_with_value(std::vector<uint64_t>& hashes,
PrimitiveType type,
const uint8_t* __restrict
null_data) const {
auto s = hashes.size();
DCHECK(s == size());
@@ -160,6 +160,16 @@ void
ColumnDecimal<T>::update_crcs_with_value(std::vector<uint32_t>& hashes, Pri
}
}
+template <typename T>
+void ColumnDecimal<T>::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict
null_data) const {
+ auto s = size();
+ for (int i = 0; i < s; i++) {
+ hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const
char*>(&data[i]), sizeof(T),
+ hashes[i]);
+ }
+}
+
template <typename T>
void ColumnDecimal<T>::get_permutation(bool reverse, size_t limit, int,
IColumn::Permutation& res) const {
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index 41dba1827d..9957742c0d 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -154,9 +154,11 @@ public:
const uint8_t* null_map) override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
- void update_hashes_with_value(std::vector<SipHash>& hash,
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const
override;
- void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType
type,
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const
override;
+ void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
int compare_at(size_t n, size_t m, const IColumn& rhs_, int
nan_direction_hint) const override;
diff --git a/be/src/vec/columns/column_dummy.h
b/be/src/vec/columns/column_dummy.h
index 96547bfd00..957b333189 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -72,11 +72,6 @@ public:
return pos;
}
- void update_hash_with_value(size_t /*n*/, SipHash& /*hash*/) const
override {}
-
- void update_hashes_with_value(std::vector<SipHash>& hashes,
- const uint8_t* __restrict null_data) const
override {};
-
void insert_from(const IColumn&, size_t) override { ++s; }
void insert_range_from(const IColumn& /*src*/, size_t /*start*/, size_t
length) override {
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index b6bf95f449..3d35f24e61 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -25,7 +25,6 @@
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
#include "vec/common/nan_utils.h"
-#include "vec/common/sip_hash.h"
#include "vec/common/typeid_cast.h"
#include "vec/core/sort_block.h"
@@ -73,7 +72,7 @@ void
ColumnNullable::update_hashes_with_value(std::vector<SipHash>& hashes,
}
}
-void ColumnNullable::update_crcs_with_value(std::vector<uint32_t>& hashes,
+void ColumnNullable::update_crcs_with_value(std::vector<uint64_t>& hashes,
doris::PrimitiveType type,
const uint8_t* __restrict
null_data) const {
DCHECK(null_data == nullptr);
@@ -92,6 +91,21 @@ void
ColumnNullable::update_crcs_with_value(std::vector<uint32_t>& hashes,
}
}
+void ColumnNullable::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict
null_data) const {
+ DCHECK(null_data == nullptr);
+ auto s = size();
+ auto* __restrict real_null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data().data();
+ if (doris::simd::count_zero_num(reinterpret_cast<const
int8_t*>(real_null_data), s) == s) {
+ nested_column->update_hashes_with_value(hashes, nullptr);
+ } else {
+ for (int i = 0; i < s; ++i) {
+ if (real_null_data[i] != 0) hashes[i] =
HashUtil::xxHash64NullWithSeed(hashes[i]);
+ }
+ nested_column->update_hashes_with_value(hashes, real_null_data);
+ }
+}
+
MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const {
MutableColumnPtr new_nested_col =
get_nested_column().clone_resized(new_size);
auto new_null_map = ColumnUInt8::create();
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 723e88c8b6..2d3d36f7bc 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -163,8 +163,10 @@ public:
void update_hash_with_value(size_t n, SipHash& hash) const override;
void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const
override;
- void update_crcs_with_value(std::vector<uint32_t>& hash, PrimitiveType
type,
+ void update_crcs_with_value(std::vector<uint64_t>& hash, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const
override;
void get_extremes(Field& min, Field& max) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override {
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index 9d2d43b48a..87fa8da649 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -111,7 +111,7 @@ void ColumnString::insert_indices_from(const IColumn& src,
const int* indices_be
}
}
-void ColumnString::update_crcs_with_value(std::vector<uint32_t>& hashes,
doris::PrimitiveType type,
+void ColumnString::update_crcs_with_value(std::vector<uint64_t>& hashes,
doris::PrimitiveType type,
const uint8_t* __restrict null_data)
const {
auto s = hashes.size();
DCHECK(s == size());
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 9cae0509cc..8dc597e18c 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -243,9 +243,20 @@ public:
SIP_HASHES_FUNCTION_COLUMN_IMPL();
}
- void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType
type,
+ void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const
override {
+ auto s = size();
+ for (int i = 0; i < s; i++) {
+ size_t string_size = size_at(i);
+ size_t offset = offset_at(i);
+ hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const
char*>(&chars[offset]),
+ string_size, hashes[i]);
+ }
+ }
+
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert_indices_from(const IColumn& src, const int* indices_begin,
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index 1678e5d7f4..e80b4009c6 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -111,6 +111,16 @@ void
ColumnVector<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
SIP_HASHES_FUNCTION_COLUMN_IMPL();
}
+template <typename T>
+void ColumnVector<T>::update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict
null_data) const {
+ auto s = size();
+ for (int i = 0; i < s; i++) {
+ hashes[i] = HashUtil::xxHash64WithSeed(reinterpret_cast<const
char*>(&data[i]), sizeof(T),
+ hashes[i]);
+ }
+}
+
template <typename T>
void ColumnVector<T>::sort_column(const ColumnSorter* sorter, EqualFlags&
flags,
IColumn::Permutation& perms, EqualRange&
range,
@@ -119,7 +129,7 @@ void ColumnVector<T>::sort_column(const ColumnSorter*
sorter, EqualFlags& flags,
}
template <typename T>
-void ColumnVector<T>::update_crcs_with_value(std::vector<uint32_t>& hashes,
PrimitiveType type,
+void ColumnVector<T>::update_crcs_with_value(std::vector<uint64_t>& hashes,
PrimitiveType type,
const uint8_t* __restrict
null_data) const {
auto s = hashes.size();
DCHECK(s == size());
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 447886d08f..020c771057 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -250,9 +250,12 @@ public:
void update_hashes_with_value(std::vector<SipHash>& hashes,
const uint8_t* __restrict null_data) const
override;
- void update_crcs_with_value(std::vector<uint32_t>& hashes, PrimitiveType
type,
+ void update_crcs_with_value(std::vector<uint64_t>& hashes, PrimitiveType
type,
const uint8_t* __restrict null_data) const
override;
+ void update_hashes_with_value(uint64_t* __restrict hashes,
+ const uint8_t* __restrict null_data) const
override;
+
size_t byte_size() const override { return data.size() * sizeof(data[0]); }
size_t allocated_bytes() const override { return data.allocated_bytes(); }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index fc01d8a632..94a4e99163 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -410,6 +410,9 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
shuffle(_channels.begin(), _channels.end(), g);
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
+ if (_state->query_options().__isset.enable_new_shuffle_hash_method) {
+ _new_shuffle_hash_method =
_state->query_options().enable_new_shuffle_hash_method;
+ }
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state,
_row_desc));
} else {
RETURN_IF_ERROR(VExpr::prepare(_partition_expr_ctxs, state,
_row_desc));
@@ -495,7 +498,8 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
current_channel->ch_roll_pb_block();
}
_current_channel_idx = (_current_channel_idx + 1) % _channels.size();
- } else if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ } else if (_part_type == TPartitionType::HASH_PARTITIONED ||
+ _part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
// will only copy schema
// we don't want send temp columns
auto column_to_keep = block->columns();
@@ -506,45 +510,49 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block) {
// vectorized calculate hash
int rows = block->rows();
- // for each row, we have a siphash val
- std::vector<SipHash> siphashs(rows);
- // result[j] means column index, i means rows index
- for (int j = 0; j < result_size; ++j) {
-
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
- }
-
- // channel2rows' subscript means channel id
- std::vector<vectorized::UInt64> hash_vals(rows);
- for (int i = 0; i < rows; i++) {
- hash_vals[i] = siphashs[i].get64();
- }
+ auto element_size = _channels.size();
+ std::vector<uint64_t> hash_vals(rows);
+ auto* __restrict hashes = hash_vals.data();
+
+ // TODO: after we support new shuffle hash method, should simple the
code
+ if (_part_type == TPartitionType::HASH_PARTITIONED) {
+ if (!_new_shuffle_hash_method) {
+ // for each row, we have a siphash val
+ std::vector<SipHash> siphashs(rows);
+ // result[j] means column index, i means rows index
+ for (int j = 0; j < result_size; ++j) {
+
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
+ }
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = siphashs[i].get64() % element_size;
+ }
+ } else {
+ // result[j] means column index, i means rows index, here to
calculate the xxhash value
+ for (int j = 0; j < result_size; ++j) {
+
block->get_by_position(result[j]).column->update_hashes_with_value(hashes);
+ }
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(channel_add_rows(_channels, _channels.size(),
hash_vals, rows, block));
- } else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
- // will only copy schema
- // we don't want send temp columns
- auto column_to_keep = block->columns();
- // 1. calculate hash
- // 2. dispatch rows to channel
- int result_size = _partition_expr_ctxs.size();
- int result[result_size];
- RETURN_IF_ERROR(get_partition_column_result(block, result));
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = hashes[i] % element_size;
+ }
+ }
- // vectorized calculate hash val
- int rows = block->rows();
- // for each row, we have a hash_val
- std::vector<uint32_t> hash_vals(rows);
+ Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(channel_add_rows(_channels, element_size, hashes,
rows, block));
+ } else {
+ for (int j = 0; j < result_size; ++j) {
+
block->get_by_position(result[j]).column->update_crcs_with_value(
+ hash_vals,
_partition_expr_ctxs[j]->root()->type().type);
+ }
+ element_size = _channel_shared_ptrs.size();
+ for (int i = 0; i < rows; i++) {
+ hashes[i] = hashes[i] % element_size;
+ }
- // result[j] means column index, i means rows index
- for (int j = 0; j < result_size; ++j) {
- block->get_by_position(result[j]).column->update_crcs_with_value(
- hash_vals, _partition_expr_ctxs[j]->root()->type().type);
+ Block::erase_useless_column(block, column_to_keep);
+ RETURN_IF_ERROR(
+ channel_add_rows(_channel_shared_ptrs, element_size,
hashes, rows, block));
}
-
- Block::erase_useless_column(block, column_to_keep);
- RETURN_IF_ERROR(channel_add_rows(_channel_shared_ptrs,
_channel_shared_ptrs.size(),
- hash_vals, rows, block));
} else {
// Range partition
// 1. calculate range
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index e537749dd7..0d44d3152b 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -88,9 +88,9 @@ protected:
return Status::OK();
}
- template <typename Channels, typename HashVals>
- Status channel_add_rows(Channels& channels, int num_channels, const
HashVals& hash_vals,
- int rows, Block* block);
+ template <typename Channels>
+ Status channel_add_rows(Channels& channels, int num_channels, uint64_t*
channel_ids, int rows,
+ Block* block);
struct hash_128 {
uint64_t high;
@@ -152,6 +152,8 @@ protected:
bool _transfer_large_data_by_brpc = false;
segment_v2::CompressionTypePB _compression_type;
+
+ bool _new_shuffle_hash_method = false;
};
// TODO: support local exechange
@@ -311,14 +313,14 @@ private:
bool _enable_local_exchange = false;
};
-template <typename Channels, typename HashVals>
+template <typename Channels>
Status VDataStreamSender::channel_add_rows(Channels& channels, int
num_channels,
- const HashVals& hash_vals, int
rows, Block* block) {
+ uint64_t* __restrict channel_ids,
int rows,
+ Block* block) {
std::vector<int> channel2rows[num_channels];
for (int i = 0; i < rows; i++) {
- auto cid = hash_vals[i] % num_channels;
- channel2rows[cid].emplace_back(i);
+ channel2rows[channel_ids[i]].emplace_back(i);
}
for (int i = 0; i < num_channels; ++i) {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index e27f5ff2f7..b8c32effb7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -217,6 +217,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String SKIP_DELETE_PREDICATE = "skip_delete_predicate";
+ public static final String ENABLE_NEW_SHUFFLE_HASH_METHOD =
"enable_new_shuffle_hash_method";
+
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field,
String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -555,6 +557,8 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = ENABLE_FALLBACK_TO_ORIGINAL_PLANNER)
public boolean enableFallbackToOriginalPlanner = true;
+ @VariableMgr.VarAttr(name = ENABLE_NEW_SHUFFLE_HASH_METHOD)
+ public boolean enableNewShffleHashMethod = true;
public String getBlockEncryptionMode() {
return blockEncryptionMode;
@@ -1153,6 +1157,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setEnableFunctionPushdown(enableFunctionPushdown);
tResult.setFragmentTransmissionCompressionCodec(fragmentTransmissionCompressionCodec);
tResult.setEnableLocalExchange(enableLocalExchange);
+ tResult.setEnableNewShuffleHashMethod(enableNewShffleHashMethod);
tResult.setSkipStorageEngineMerge(skipStorageEngineMerge);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index d9911e6141..d25a8f04e9 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -175,6 +175,8 @@ struct TQueryOptions {
// For debug purpose, skip delete predicates when reading data
49: optional bool skip_delete_predicate = false
+
+ 50: optional bool enable_new_shuffle_hash_method = true
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]