This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 54d1630c42 [Opt](vectorized) speed up hash function compute in hash
partition (#12334)
54d1630c42 is described below
commit 54d1630c429ad8222f09a2b718eb29010317605c
Author: HappenLee <[email protected]>
AuthorDate: Wed Sep 7 10:11:40 2022 +0800
[Opt](vectorized) speed up hash function compute in hash partition (#12334)
After do the opt of hash function, the compute of siphash in HASH_PARTITION
in vdata_stream_sender
Before: 1s800ms
After: 800ms
---
be/src/vec/columns/column.h | 21 +++++++++++++++++++++
be/src/vec/columns/column_array.cpp | 6 +++++-
be/src/vec/columns/column_array.h | 2 ++
be/src/vec/columns/column_complex.h | 6 ++++++
be/src/vec/columns/column_const.cpp | 17 +++++++++++++++++
be/src/vec/columns/column_const.h | 3 +++
be/src/vec/columns/column_decimal.cpp | 6 ++++++
be/src/vec/columns/column_decimal.h | 3 +++
be/src/vec/columns/column_dummy.h | 3 +++
be/src/vec/columns/column_nullable.cpp | 17 +++++++++++++++++
be/src/vec/columns/column_nullable.h | 2 ++
be/src/vec/columns/column_string.h | 6 ++++++
be/src/vec/columns/column_vector.cpp | 6 ++++++
be/src/vec/columns/column_vector.h | 3 +++
be/src/vec/exprs/vruntimefilter_wrapper.cpp | 1 -
be/src/vec/sink/vdata_stream_sender.cpp | 5 +----
16 files changed, 101 insertions(+), 6 deletions(-)
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index 565a9416d7..fc3ab1478d 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -30,6 +30,19 @@
class SipHash;
+#define SIP_HASHES_FUNCTION_COLUMN_IMPL() \
+ auto s = hashes.size(); \
+ DCHECK(s == size()); \
+ if (null_data == nullptr) { \
+ for (size_t i = 0; i < s; i++) { \
+ update_hash_with_value(i, hashes[i]); \
+ } \
+ } else { \
+ for (size_t i = 0; i < s; i++) { \
+ if (null_data[i] == 0) update_hash_with_value(i, hashes[i]); \
+ } \
+ }
+
namespace doris::vectorized {
class Arena;
@@ -286,6 +299,14 @@ public:
/// passed bytes to hash must identify sequence of values unambiguously.
virtual void update_hash_with_value(size_t n, SipHash& hash) const = 0;
+ /// Update state of hash function with value of n elements to avoid the
virtual function call
+ /// null_data to mark whether need to do hash compute, null_data == nullptr
+ /// means all element need to do hash function, else only *null_data != 0
need to do hash func
+ virtual void update_hashes_with_value(std::vector<SipHash>& hash,
+ const uint8_t* __restrict null_data
= nullptr) const {
+ LOG(FATAL) << "update_hashes_with_value not supported";
+ };
+
/** Removes elements that don't match the filter.
* Is used in WHERE and HAVING operations.
* If result_size_hint > 0, then makes advance reserve(result_size_hint)
for the result column;
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index c498b72345..db52f62c63 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -219,10 +219,14 @@ void ColumnArray::update_hash_with_value(size_t n,
SipHash& hash) const {
size_t array_size = size_at(n);
size_t offset = offset_at(n);
- hash.update(array_size);
for (size_t i = 0; i < array_size; ++i)
get_data().update_hash_with_value(offset + i, hash);
}
+void ColumnArray::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict
null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
void ColumnArray::insert(const Field& x) {
const Array& array = doris::vectorized::get<const Array&>(x);
size_t size = array.size();
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index 686089f4e9..820064505d 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -83,6 +83,8 @@ public:
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*&
begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override;
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert(const Field& x) override;
void insert_from(const IColumn& src_, size_t n) override;
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index 81e826d5b9..d7e99406e5 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -183,10 +183,16 @@ public:
LOG(FATAL) << "deserialize_and_insert_from_arena not implemented";
}
+ // maybe we do not need to impl the function
void update_hash_with_value(size_t n, SipHash& hash) const override {
// TODO add hash function
}
+ void update_hashes_with_value(std::vector<SipHash>& hash,
+ const uint8_t* __restrict null_data) const
override {
+ // TODO add hash function
+ }
+
[[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
int nan_direction_hint) const override {
LOG(FATAL) << "compare_at not implemented";
diff --git a/be/src/vec/columns/column_const.cpp
b/be/src/vec/columns/column_const.cpp
index efb790249a..946074cf61 100644
--- a/be/src/vec/columns/column_const.cpp
+++ b/be/src/vec/columns/column_const.cpp
@@ -22,6 +22,7 @@
#include "vec/columns/columns_common.h"
#include "vec/common/pod_array.h"
+#include "vec/common/sip_hash.h"
#include "vec/common/typeid_cast.h"
namespace doris::vectorized {
@@ -86,6 +87,22 @@ ColumnPtr ColumnConst::permute(const Permutation& perm,
size_t limit) const {
return ColumnConst::create(data, limit);
}
+void ColumnConst::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict
null_data) const {
+ DCHECK(null_data == nullptr);
+ DCHECK(hashes.size() == size());
+ auto real_data = data->get_data_at(0);
+ if (real_data.data == nullptr) {
+ for (auto& hash : hashes) {
+ hash.update(0);
+ }
+ } else {
+ for (auto& hash : hashes) {
+ hash.update(real_data.data, real_data.size);
+ }
+ }
+}
+
MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector&
selector) const {
if (s != selector.size()) {
LOG(FATAL) << fmt::format("Size of selector ({}) doesn't match size of
column ({})",
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 615c936a5e..be7f56ab23 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -132,6 +132,9 @@ public:
data->update_hash_with_value(0, hash);
}
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override;
+
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const
override;
ColumnPtr replicate(const Offsets& offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index 1e34056286..91145e4473 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -120,6 +120,12 @@ void ColumnDecimal<T>::update_hash_with_value(size_t n,
SipHash& hash) const {
hash.update(data[n]);
}
+template <typename T>
+void ColumnDecimal<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict
null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
template <typename T>
void ColumnDecimal<T>::get_permutation(bool reverse, size_t limit, int,
IColumn::Permutation& res) const {
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index a43227bd41..2b3dac0ae5 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -154,6 +154,9 @@ public:
const uint8_t* null_map) override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_hashes_with_value(std::vector<SipHash>& hash,
+ const uint8_t* __restrict null_data) const
override;
+
int compare_at(size_t n, size_t m, const IColumn& rhs_, int
nan_direction_hint) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
IColumn::Permutation& res) const override;
diff --git a/be/src/vec/columns/column_dummy.h
b/be/src/vec/columns/column_dummy.h
index e7fb657161..96547bfd00 100644
--- a/be/src/vec/columns/column_dummy.h
+++ b/be/src/vec/columns/column_dummy.h
@@ -74,6 +74,9 @@ public:
void update_hash_with_value(size_t /*n*/, SipHash& /*hash*/) const
override {}
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override {};
+
void insert_from(const IColumn&, size_t) override { ++s; }
void insert_range_from(const IColumn& /*src*/, size_t /*start*/, size_t
length) override {
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 14eebe3150..cff4d0f7c3 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -20,6 +20,7 @@
#include "vec/columns/column_nullable.h"
+#include "util/simd/bits.h"
#include "vec/columns/column_const.h"
#include "vec/common/arena.h"
#include "vec/common/assert_cast.h"
@@ -50,6 +51,22 @@ void ColumnNullable::update_hash_with_value(size_t n,
SipHash& hash) const {
get_nested_column().update_hash_with_value(n, hash);
}
+void ColumnNullable::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict
null_data) const {
+ DCHECK(null_data == nullptr);
+ auto s = hashes.size();
+ DCHECK(s == size());
+ auto* __restrict real_null_data = assert_cast<const
ColumnUInt8&>(*null_map).get_data().data();
+ if (doris::simd::count_zero_num(reinterpret_cast<const
int8_t*>(real_null_data), s) == s) {
+ nested_column->update_hashes_with_value(hashes, nullptr);
+ } else {
+ for (int i = 0; i < s; ++i) {
+ if (real_null_data[i] != 0) hashes[i].update(0);
+ }
+ nested_column->update_hashes_with_value(hashes, real_null_data);
+ }
+}
+
MutableColumnPtr ColumnNullable::clone_resized(size_t new_size) const {
MutableColumnPtr new_nested_col =
get_nested_column().clone_resized(new_size);
auto new_null_map = ColumnUInt8::create();
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 05807e10ec..397f3be1bd 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -159,6 +159,8 @@ public:
ColumnPtr replicate(const Offsets& replicate_offsets) const override;
void replicate(const uint32_t* counts, size_t target_size, IColumn&
column) const override;
void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override;
void get_extremes(Field& min, Field& max) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector& selector)
const override {
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index d8a704e362..d2744e20b7 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -246,10 +246,16 @@ public:
size_t string_size = size_at(n);
size_t offset = offset_at(n);
+ // TODO: Rethink we really need to update the string_size?
hash.update(reinterpret_cast<const char*>(&string_size),
sizeof(string_size));
hash.update(reinterpret_cast<const char*>(&chars[offset]),
string_size);
}
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+ }
+
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert_indices_from(const IColumn& src, const int* indices_begin,
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index 279a777992..13c5810026 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -105,6 +105,12 @@ void ColumnVector<T>::update_hash_with_value(size_t n,
SipHash& hash) const {
hash.update(data[n]);
}
+template <typename T>
+void ColumnVector<T>::update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict
null_data) const {
+ SIP_HASHES_FUNCTION_COLUMN_IMPL();
+}
+
template <typename T>
struct ColumnVector<T>::less {
const Self& parent;
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index ad39b814f2..5101894b92 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -247,6 +247,9 @@ public:
void update_hash_with_value(size_t n, SipHash& hash) const override;
+ void update_hashes_with_value(std::vector<SipHash>& hashes,
+ const uint8_t* __restrict null_data) const
override;
+
size_t byte_size() const override { return data.size() * sizeof(data[0]); }
size_t allocated_bytes() const override { return data.allocated_bytes(); }
diff --git a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
index 869de36a26..bbd7cf576c 100644
--- a/be/src/vec/exprs/vruntimefilter_wrapper.cpp
+++ b/be/src/vec/exprs/vruntimefilter_wrapper.cpp
@@ -21,7 +21,6 @@
#include "util/simd/bits.h"
#include "vec/columns/column_nullable.h"
-#include "vec/columns/column_set.h"
#include "vec/core/field.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/functions/simple_function_factory.h"
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index 34b98a9967..e22e9277af 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -528,10 +528,7 @@ Status VDataStreamSender::send(RuntimeState* state, Block*
block) {
std::vector<SipHash> siphashs(rows);
// result[j] means column index, i means rows index
for (int j = 0; j < result_size; ++j) {
- auto column = block->get_by_position(result[j]).column;
- for (int i = 0; i < rows; ++i) {
- column->update_hash_with_value(i, siphashs[i]);
- }
+
block->get_by_position(result[j]).column->update_hashes_with_value(siphashs);
}
// channel2rows' subscript means channel id
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]