This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new b7c9007776 [improvement][agg]Process aggregated results in the
vectorized way (#11084)
b7c9007776 is described below
commit b7c9007776cf9b8f9db343d41b164cc6760f0975
Author: Jerry Hu <[email protected]>
AuthorDate: Fri Jul 22 22:04:43 2022 +0800
[improvement][agg]Process aggregated results in the vectorized way (#11084)
---
.../vec/aggregate_functions/aggregate_function.h | 48 +++++++++++++
be/src/vec/columns/column.h | 21 ++++++
be/src/vec/columns/column_decimal.cpp | 22 ++++++
be/src/vec/columns/column_decimal.h | 12 ++++
be/src/vec/columns/column_nullable.cpp | 29 ++++++++
be/src/vec/columns/column_nullable.h | 10 +++
be/src/vec/columns/column_string.cpp | 21 ++++++
be/src/vec/columns/column_string.h | 25 +++++++
be/src/vec/columns/column_vector.cpp | 22 ++++++
be/src/vec/columns/column_vector.h | 13 ++++
be/src/vec/exec/vaggregation_node.cpp | 81 ++++++++++++----------
be/src/vec/exec/vaggregation_node.h | 39 +++++++++++
be/src/vec/exprs/vectorized_agg_fn.cpp | 5 ++
be/src/vec/exprs/vectorized_agg_fn.h | 3 +
14 files changed, 316 insertions(+), 35 deletions(-)
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index eca81b3199..677c189002 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -91,19 +91,32 @@ public:
virtual void merge(AggregateDataPtr __restrict place,
ConstAggregateDataPtr rhs,
Arena* arena) const = 0;
+ virtual void merge_vec(const AggregateDataPtr* places, size_t offset,
ConstAggregateDataPtr rhs,
+ Arena* arena, const size_t num_rows) const = 0;
+
/// Serializes state (to transmit it over the network, for example).
virtual void serialize(ConstAggregateDataPtr __restrict place,
BufferWritable& buf) const = 0;
+ virtual void serialize_vec(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ BufferWritable& buf, const size_t num_rows)
const = 0;
+
/// Deserializes state. This function is called only for empty (just
created) states.
virtual void deserialize(AggregateDataPtr __restrict place,
BufferReadable& buf,
Arena* arena) const = 0;
+ virtual void deserialize_vec(AggregateDataPtr places, ColumnString*
column, Arena* arena,
+ size_t num_rows) const = 0;
+
/// Returns true if a function requires Arena to handle own states (see
add(), merge(), deserialize()).
virtual bool allocates_memory_in_arena() const { return false; }
/// Inserts results into a column.
virtual void insert_result_into(ConstAggregateDataPtr __restrict place,
IColumn& to) const = 0;
+ virtual void insert_result_into_vec(const std::vector<AggregateDataPtr>&
places,
+ const size_t offset, IColumn& to,
+ const size_t num_rows) const = 0;
+
/** Returns true for aggregate functions of type -State.
* They are executed as other aggregate functions, but not finalized
(return an aggregation state that can be combined with another).
*/
@@ -176,6 +189,41 @@ public:
static_cast<const Derived*>(this)->add(place, columns, i, arena);
}
}
+
+ void insert_result_into_vec(const std::vector<AggregateDataPtr>& places,
const size_t offset,
+ IColumn& to, const size_t num_rows) const
override {
+ for (size_t i = 0; i != num_rows; ++i) {
+ static_cast<const Derived*>(this)->insert_result_into(places[i] +
offset, to);
+ }
+ }
+
+ void serialize_vec(const std::vector<AggregateDataPtr>& places, size_t
offset,
+ BufferWritable& buf, const size_t num_rows) const
override {
+ for (size_t i = 0; i != num_rows; ++i) {
+ static_cast<const Derived*>(this)->serialize(places[i] + offset,
buf);
+ buf.commit();
+ }
+ }
+
+ void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena*
arena,
+ size_t num_rows) const override {
+ const auto size_of_data = static_cast<const
Derived*>(this)->size_of_data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ auto place = places + size_of_data * i;
+ VectorBufferReader buffer_reader(column->get_data_at(i));
+ static_cast<const Derived*>(this)->create(place);
+ static_cast<const Derived*>(this)->deserialize(place,
buffer_reader, arena);
+ }
+ }
+
+ void merge_vec(const AggregateDataPtr* places, size_t offset,
ConstAggregateDataPtr rhs,
+ Arena* arena, const size_t num_rows) const override {
+ const auto size_of_data = static_cast<const
Derived*>(this)->size_of_data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ static_cast<const Derived*>(this)->merge(places[i] + offset, rhs +
size_of_data * i,
+ arena);
+ }
+ }
};
/// Implements several methods for manipulation with data. T - type of
structure with data for aggregation.
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index b6c8f3e67c..006389ded2 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -204,6 +204,16 @@ public:
LOG(FATAL) << "Method insert_many_binary_data is not supported for "
<< get_name();
}
+ virtual void insert_many_strings(const StringRef* strings, size_t num) {
+ LOG(FATAL) << "Method insert_many_binary_data is not supported for "
<< get_name();
+ }
+
+ // Here `pos` points to the memory data type is the same as the data type
of the column.
+ // This function is used by `insert_keys_into_columns` in AggregationNode.
+ virtual void insert_many_raw_data(const char* pos, size_t num) {
+ LOG(FATAL) << "Method insert_many_raw_data is not supported for " <<
get_name();
+ }
+
void insert_many_data(const char* pos, size_t length, size_t data_num) {
for (size_t i = 0; i < data_num; ++i) {
insert_data(pos, length);
@@ -263,6 +273,17 @@ public:
LOG(FATAL) << "serialize_vec_with_null_map not supported";
}
+ // This function deserializes group-by keys into column in the vectorized
way.
+ virtual void deserialize_vec(std::vector<StringRef>& keys, const size_t
num_rows) {
+ LOG(FATAL) << "deserialize_vec not supported";
+ }
+
+ // Used in ColumnNullable::deserialize_vec
+ virtual void deserialize_vec_with_null_map(std::vector<StringRef>& keys,
const size_t num_rows,
+ const uint8_t* null_map) {
+ LOG(FATAL) << "deserialize_vec_with_null_map not supported";
+ }
+
/// Update state of hash function with value of n-th element.
/// On subsequent calls of this method for sequence of column values of
arbitrary types,
/// passed bytes to hash must identify sequence of values unambiguously.
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index 315c1103c0..1b3c58a778 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -85,6 +85,28 @@ void
ColumnDecimal<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
}
}
+template <typename T>
+void ColumnDecimal<T>::deserialize_vec(std::vector<StringRef>& keys, const
size_t num_rows) {
+ for (size_t i = 0; i < num_rows; ++i) {
+ keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+ keys[i].size -= sizeof(T);
+ }
+}
+
+template <typename T>
+void ColumnDecimal<T>::deserialize_vec_with_null_map(std::vector<StringRef>&
keys,
+ const size_t num_rows,
+ const uint8_t* null_map) {
+ for (size_t i = 0; i < num_rows; ++i) {
+ if (null_map[i] == 0) {
+ keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+ keys[i].size -= sizeof(T);
+ } else {
+ insert_default();
+ }
+ }
+}
+
template <typename T>
UInt64 ColumnDecimal<T>::get64(size_t n) const {
if constexpr (sizeof(T) > sizeof(UInt64)) {
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index d28e7ae469..e318271b13 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -114,6 +114,13 @@ public:
}
void insert_many_fix_len_data(const char* data_ptr, size_t num) override;
+
+ void insert_many_raw_data(const char* pos, size_t num) override {
+ size_t old_size = data.size();
+ data.resize(old_size + num);
+ memcpy(data.data() + old_size, pos, num * sizeof(T));
+ }
+
void insert_data(const char* pos, size_t /*length*/) override;
void insert_default() override { data.push_back(T()); }
void insert(const Field& x) override {
@@ -141,6 +148,11 @@ public:
const uint8_t* null_map,
size_t max_row_byte_size) const
override;
+ void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows)
override;
+
+ void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const
size_t num_rows,
+ const uint8_t* null_map) override;
+
void update_hash_with_value(size_t n, SipHash& hash) const override;
int compare_at(size_t n, size_t m, const IColumn& rhs_, int
nan_direction_hint) const override;
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 982ddf9fdd..b8058a8d97 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -96,6 +96,20 @@ void ColumnNullable::insert_data(const char* pos, size_t
length) {
}
}
+void ColumnNullable::insert_many_strings(const StringRef* strings, size_t num)
{
+ auto& nested_column = get_nested_column();
+ auto& null_map_data = get_null_map_data();
+ for (size_t i = 0; i != num; ++i) {
+ if (strings[i].data == nullptr) {
+ nested_column.insert_default();
+ null_map_data.push_back(1);
+ } else {
+ nested_column.insert_data(strings[i].data, strings[i].size);
+ null_map_data.push_back(0);
+ }
+ }
+}
+
StringRef ColumnNullable::serialize_value_into_arena(size_t n, Arena& arena,
char const*& begin) const
{
const auto& arr = get_null_map_data();
@@ -152,6 +166,21 @@ void ColumnNullable::serialize_vec(std::vector<StringRef>&
keys, size_t num_rows
get_nested_column().serialize_vec_with_null_map(keys, num_rows,
arr.data(), max_row_byte_size);
}
+void ColumnNullable::deserialize_vec(std::vector<StringRef>& keys, const
size_t num_rows) {
+ auto& arr = get_null_map_data();
+ const size_t old_size = arr.size();
+ arr.resize(old_size + num_rows);
+
+ auto* null_map_data = &arr[old_size];
+ for (size_t i = 0; i != num_rows; ++i) {
+ UInt8 val = *reinterpret_cast<const UInt8*>(keys[i].data);
+ null_map_data[i] = val;
+ keys[i].data += sizeof(val);
+ keys[i].size -= sizeof(val);
+ }
+ get_nested_column().deserialize_vec_with_null_map(keys, num_rows,
arr.data());
+}
+
void ColumnNullable::insert_range_from(const IColumn& src, size_t start,
size_t length) {
const ColumnNullable& nullable_col = assert_cast<const
ColumnNullable&>(src);
get_null_map_column().insert_range_from(*nullable_col.null_map, start,
length);
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 1fa9a50e10..06e3c9fa8a 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -87,11 +87,16 @@ public:
/// JOIN_NULL_HINT in null map means null is generated by join, only use
in tuple is null
void insert_join_null_data();
+ void insert_many_strings(const StringRef* strings, size_t num) override;
+
StringRef serialize_value_into_arena(size_t n, Arena& arena, char const*&
begin) const override;
const char* deserialize_and_insert_from_arena(const char* pos) override;
size_t get_max_row_byte_size() const override;
void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
size_t max_row_byte_size) const override;
+
+ void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows)
override;
+
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
@@ -107,6 +112,11 @@ public:
get_nested_column().insert_many_fix_len_data(pos, num);
}
+ void insert_many_raw_data(const char* pos, size_t num) override {
+ get_null_map_column().fill(0, num);
+ get_nested_column().insert_many_raw_data(pos, num);
+ }
+
void insert_many_dict_data(const int32_t* data_array, size_t start_index,
const StringRef* dict,
size_t data_num, uint32_t dict_num) override {
get_null_map_column().fill(0, data_num);
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index e74ec544dc..3adf082ae0 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -228,6 +228,27 @@ void
ColumnString::serialize_vec_with_null_map(std::vector<StringRef>& keys, siz
}
}
+void ColumnString::deserialize_vec(std::vector<StringRef>& keys, const size_t
num_rows) {
+ for (size_t i = 0; i != num_rows; ++i) {
+ auto original_ptr = keys[i].data;
+ keys[i].data = deserialize_and_insert_from_arena(original_ptr);
+ keys[i].size -= keys[i].data - original_ptr;
+ }
+}
+
+void ColumnString::deserialize_vec_with_null_map(std::vector<StringRef>& keys,
+ const size_t num_rows, const
uint8_t* null_map) {
+ for (size_t i = 0; i != num_rows; ++i) {
+ if (null_map[i] == 0) {
+ auto original_ptr = keys[i].data;
+ keys[i].data = deserialize_and_insert_from_arena(original_ptr);
+ keys[i].size -= keys[i].data - original_ptr;
+ } else {
+ insert_default();
+ }
+ }
+}
+
template <typename Type>
ColumnPtr ColumnString::index_impl(const PaddedPODArray<Type>& indexes, size_t
limit) const {
if (limit == 0) return ColumnString::create();
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 6d7b55da2d..bbe6993ae2 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -180,6 +180,26 @@ public:
}
};
+ void insert_many_strings(const StringRef* strings, size_t num) override {
+ size_t new_size = 0;
+ for (size_t i = 0; i < num; i++) {
+ new_size += strings[i].size + 1;
+ }
+
+ const size_t old_size = chars.size();
+ chars.resize(old_size + new_size);
+
+ Char* data = chars.data();
+ size_t offset = old_size;
+ for (size_t i = 0; i < num; i++) {
+ uint32_t len = strings[i].size;
+ if (len) memcpy(data + offset, strings[i].data, len);
+ data[offset + len] = 0;
+ offset += len + 1;
+ offsets.push_back(offset);
+ }
+ }
+
void insert_many_dict_data(const int32_t* data_array, size_t start_index,
const StringRef* dict,
size_t num, uint32_t /*dict_num*/) override {
for (size_t end_index = start_index + num; start_index < end_index;
++start_index) {
@@ -208,6 +228,8 @@ public:
const char* deserialize_and_insert_from_arena(const char* pos) override;
+ void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows)
override;
+
size_t get_max_row_byte_size() const override;
void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
@@ -217,6 +239,9 @@ public:
const uint8_t* null_map,
size_t max_row_byte_size) const override;
+ void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const
size_t num_rows,
+ const uint8_t* null_map) override;
+
void update_hash_with_value(size_t n, SipHash& hash) const override {
size_t string_size = size_at(n);
size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index 3a198fef20..c740a515a0 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -78,6 +78,28 @@ void
ColumnVector<T>::serialize_vec_with_null_map(std::vector<StringRef>& keys,
}
}
+template <typename T>
+void ColumnVector<T>::deserialize_vec(std::vector<StringRef>& keys, const
size_t num_rows) {
+ for (size_t i = 0; i != num_rows; ++i) {
+ keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+ keys[i].size -= sizeof(T);
+ }
+}
+
+template <typename T>
+void ColumnVector<T>::deserialize_vec_with_null_map(std::vector<StringRef>&
keys,
+ const size_t num_rows,
+ const uint8_t* null_map) {
+ for (size_t i = 0; i < num_rows; ++i) {
+ if (null_map[i] == 0) {
+ keys[i].data = deserialize_and_insert_from_arena(keys[i].data);
+ keys[i].size -= sizeof(T);
+ } else {
+ insert_default();
+ }
+ }
+}
+
template <typename T>
void ColumnVector<T>::update_hash_with_value(size_t n, SipHash& hash) const {
hash.update(data[n]);
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 544cf8bc4a..c673f0b7bd 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -208,6 +208,14 @@ public:
}
}
+ void insert_many_raw_data(const char* pos, size_t num) override {
+ if constexpr (std::is_same_v<T, vectorized::Int128>) {
+ insert_many_in_copy_way(pos, num);
+ } else {
+ insert_many_default_type(pos, num);
+ }
+ }
+
void insert_default() override { data.push_back(T()); }
void insert_many_defaults(size_t length) override {
@@ -222,6 +230,11 @@ public:
const char* deserialize_and_insert_from_arena(const char* pos) override;
+ void deserialize_vec(std::vector<StringRef>& keys, const size_t num_rows)
override;
+
+ void deserialize_vec_with_null_map(std::vector<StringRef>& keys, const
size_t num_rows,
+ const uint8_t* null_map) override;
+
size_t get_max_row_byte_size() const override;
void serialize_vec(std::vector<StringRef>& keys, size_t num_rows,
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index c7539996c6..c5f3d7ddf9 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -959,16 +959,27 @@ Status
AggregationNode::_get_with_serialized_key_result(RuntimeState* state, Blo
auto& data = agg_method.data;
auto& iter = agg_method.iterator;
agg_method.init_once();
- while (iter != data.end() && key_columns[0]->size() <
state->batch_size()) {
- const auto& key = iter->get_first();
- auto& mapped = iter->get_second();
- agg_method.insert_key_into_columns(key, key_columns,
_probe_key_sz);
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i)
- _aggregate_evaluators[i]->insert_result_info(
- mapped + _offsets_of_aggregate_states[i],
value_columns[i].get());
-
+ const auto size = std::min(data.size(),
size_t(state->batch_size()));
+ using KeyType = std::decay_t<decltype(iter->get_first())>;
+ std::vector<KeyType> keys(size);
+ std::vector<AggregateDataPtr> values(size);
+
+ size_t num_rows = 0;
+ while (iter != data.end() && num_rows < state->batch_size()) {
+ keys[num_rows] = iter->get_first();
+ values[num_rows] = iter->get_second();
++iter;
+ ++num_rows;
+ }
+
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows, _probe_key_sz);
+
+ for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->insert_result_info_vec(
+ values, _offsets_of_aggregate_states[i],
value_columns[i].get(),
+ num_rows);
}
+
if (iter == data.end()) {
if (agg_method.data.has_null_key_data()) {
// only one key of group by support wrap null key
@@ -1043,19 +1054,26 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
agg_method.init_once();
auto& data = agg_method.data;
auto& iter = agg_method.iterator;
- while (iter != data.end() && key_columns[0]->size() <
state->batch_size()) {
- const auto& key = iter->get_first();
- auto& mapped = iter->get_second();
- // insert keys
- agg_method.insert_key_into_columns(key, key_columns,
_probe_key_sz);
-
- // serialize values
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize(
- mapped + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
- value_buffer_writers[i].commit();
- }
+
+ const auto size = std::min(data.size(),
size_t(state->batch_size()));
+ using KeyType = std::decay_t<decltype(iter->get_first())>;
+ std::vector<KeyType> keys(size);
+ std::vector<AggregateDataPtr> values(size);
+
+ size_t num_rows = 0;
+ while (iter != data.end() && num_rows < state->batch_size()) {
+ keys[num_rows] = iter->get_first();
+ values[num_rows] = iter->get_second();
++iter;
+ ++num_rows;
+ }
+
+ agg_method.insert_keys_into_columns(keys, key_columns,
num_rows, _probe_key_sz);
+
+ for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->function()->serialize_vec(
+ values, _offsets_of_aggregate_states[i],
value_buffer_writers[i],
+ num_rows);
}
if (iter == data.end()) {
@@ -1166,8 +1184,6 @@ Status AggregationNode::_merge_with_serialized_key(Block*
block) {
},
_agg_data._aggregated_method_variant);
- std::unique_ptr<char[]> deserialize_buffer(new
char[_total_size_of_aggregate_states]);
-
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
DCHECK(_aggregate_evaluators[i]->input_exprs_ctxs().size() == 1 &&
_aggregate_evaluators[i]->input_exprs_ctxs()[0]->root()->is_slot_ref());
@@ -1179,21 +1195,16 @@ Status
AggregationNode::_merge_with_serialized_key(Block* block) {
column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
}
- for (int j = 0; j < rows; ++j) {
- VectorBufferReader
buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
- _create_agg_status(deserialize_buffer.get());
-
- _aggregate_evaluators[i]->function()->deserialize(
- deserialize_buffer.get() +
_offsets_of_aggregate_states[i], buffer_reader,
- &_agg_arena_pool);
+ std::unique_ptr<char> deserialize_buffer(
+ new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
- _aggregate_evaluators[i]->function()->merge(
- places.data()[j] + _offsets_of_aggregate_states[i],
- deserialize_buffer.get() +
_offsets_of_aggregate_states[i],
- &_agg_arena_pool);
+
_aggregate_evaluators[i]->function()->deserialize_vec(deserialize_buffer.get(),
+
(ColumnString*)(column.get()),
+
&_agg_arena_pool, rows);
+ _aggregate_evaluators[i]->function()->merge_vec(
+ places.data(), _offsets_of_aggregate_states[i],
deserialize_buffer.get(),
+ &_agg_arena_pool, rows);
- _destroy_agg_status(deserialize_buffer.get());
- }
} else {
_aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
places.data(),
&_agg_arena_pool);
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 08834e4611..fd6b67f255 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -92,6 +92,11 @@ struct AggregationMethodSerialized {
for (auto& column : key_columns) pos =
column->deserialize_and_insert_from_arena(pos);
}
+ static void insert_keys_into_columns(std::vector<StringRef>& keys,
MutableColumns& key_columns,
+ const size_t num_rows, const Sizes&) {
+ for (auto& column : key_columns) column->deserialize_vec(keys,
num_rows);
+ }
+
void init_once() {
if (!inited) {
inited = true;
@@ -136,6 +141,12 @@ struct AggregationMethodStringNoCache {
key_columns[0]->insert_data(key.data, key.size);
}
+ static void insert_keys_into_columns(std::vector<StringRef>& keys,
MutableColumns& key_columns,
+ const size_t num_rows, const Sizes&) {
+ key_columns[0]->reserve(num_rows);
+ key_columns[0]->insert_many_strings(keys.data(), num_rows);
+ }
+
void init_once() {
if (!inited) {
inited = true;
@@ -174,6 +185,16 @@ struct AggregationMethodOneNumber {
column->insert_raw_data<sizeof(FieldType)>(key_holder);
}
+ static void insert_keys_into_columns(std::vector<Key>& keys,
MutableColumns& key_columns,
+ const size_t num_rows, const Sizes&) {
+ key_columns[0]->reserve(num_rows);
+ auto* column = static_cast<ColumnVectorHelper*>(key_columns[0].get());
+ for (size_t i = 0; i != num_rows; ++i) {
+ const auto* key_holder = reinterpret_cast<const char*>(&keys[i]);
+ column->insert_raw_data<sizeof(FieldType)>(key_holder);
+ }
+ }
+
void init_once() {
if (!inited) {
inited = true;
@@ -275,6 +296,13 @@ struct AggregationMethodKeysFixed {
}
}
+ static void insert_keys_into_columns(std::vector<Key>& keys,
MutableColumns& key_columns,
+ const size_t num_rows, const Sizes&
key_sizes) {
+ for (size_t i = 0; i != num_rows; ++i) {
+ insert_key_into_columns(keys[i], key_columns, key_sizes);
+ }
+ }
+
void init_once() {
if (!inited) {
inited = true;
@@ -312,6 +340,17 @@ struct AggregationMethodSingleNullableColumn : public
SingleColumnMethod {
col->insert_data(reinterpret_cast<const char*>(&key), sizeof(key));
}
}
+
+ static void insert_keys_into_columns(std::vector<Key>& keys,
MutableColumns& key_columns,
+ const size_t num_rows, const Sizes&) {
+ auto col = key_columns[0].get();
+ col->reserve(num_rows);
+ if constexpr (std::is_same_v<Key, StringRef>) {
+ col->insert_many_strings(keys.data(), num_rows);
+ } else {
+ col->insert_many_raw_data(reinterpret_cast<char*>(keys.data()),
num_rows);
+ }
+ }
};
using AggregatedDataWithUInt8Key =
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 6882788e2c..427b90ec2f 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -136,6 +136,11 @@ void AggFnEvaluator::insert_result_info(AggregateDataPtr
place, IColumn* column)
_function->insert_result_into(place, *column);
}
+void AggFnEvaluator::insert_result_info_vec(const
std::vector<AggregateDataPtr>& places,
+ size_t offset, IColumn* column,
const size_t num_rows) {
+ _function->insert_result_into_vec(places, offset, *column, num_rows);
+}
+
void AggFnEvaluator::reset(AggregateDataPtr place) {
_function->reset(place);
}
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index 8a4f777355..a541257487 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -58,6 +58,9 @@ public:
void insert_result_info(AggregateDataPtr place, IColumn* column);
+ void insert_result_info_vec(const std::vector<AggregateDataPtr>& place,
size_t offset,
+ IColumn* column, const size_t num_rows);
+
void reset(AggregateDataPtr place);
DataTypePtr& data_type() { return _data_type; }
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]