This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new dc8f64b3e3 [improvement](agg) Serialize the fixed-length aggregation
results with corresponding columns instead of ColumnString (#11801)
dc8f64b3e3 is described below
commit dc8f64b3e3563c41857e30485dbd4d68ea9da7ab
Author: Jerry Hu <[email protected]>
AuthorDate: Mon Aug 22 10:12:06 2022 +0800
[improvement](agg) Serialize the fixed-length aggregation results with
corresponding columns instead of ColumnString (#11801)
---
be/src/vec/CMakeLists.txt | 1 +
.../vec/aggregate_functions/aggregate_function.h | 84 +++++++-
.../aggregate_functions/aggregate_function_avg.h | 72 ++++++-
.../aggregate_functions/aggregate_function_count.h | 96 +++++++++
.../aggregate_function_min_max.h | 121 ++++++++++-
.../aggregate_function_nothing.h | 3 +
.../aggregate_functions/aggregate_function_null.h | 10 +
.../aggregate_functions/aggregate_function_sum.h | 53 +++++
be/src/vec/columns/column_fixed_length_object.h | 197 ++++++++++++++++++
be/src/vec/core/field.h | 12 ++
be/src/vec/core/types.h | 3 +
be/src/vec/data_types/data_type.cpp | 2 +
be/src/vec/data_types/data_type_factory.cpp | 3 +
be/src/vec/data_types/data_type_factory.hpp | 1 +
.../data_types/data_type_fixed_length_object.cpp | 67 +++++++
.../vec/data_types/data_type_fixed_length_object.h | 59 ++++++
be/src/vec/exec/vaggregation_node.cpp | 223 +++++++++++++--------
be/src/vec/exec/vaggregation_node.h | 36 +++-
be/src/vec/exprs/vectorized_agg_fn.cpp | 14 ++
be/src/vec/exprs/vectorized_agg_fn.h | 6 +
.../org/apache/doris/planner/AggregationNode.java | 1 +
gensrc/proto/types.proto | 1 +
gensrc/thrift/PlanNodes.thrift | 3 +-
23 files changed, 969 insertions(+), 99 deletions(-)
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index bc37c49310..8743013590 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -66,6 +66,7 @@ set(VEC_FILES
data_types/data_type_array.cpp
data_types/data_type_bitmap.cpp
data_types/data_type_factory.cpp
+ data_types/data_type_fixed_length_object.cpp
data_types/data_type_hll.cpp
data_types/data_type_nothing.cpp
data_types/data_type_nothing.cpp
diff --git a/be/src/vec/aggregate_functions/aggregate_function.h
b/be/src/vec/aggregate_functions/aggregate_function.h
index ae9a3667e5..6309b0c994 100644
--- a/be/src/vec/aggregate_functions/aggregate_function.h
+++ b/be/src/vec/aggregate_functions/aggregate_function.h
@@ -28,6 +28,7 @@
#include "vec/core/column_numbers.h"
#include "vec/core/field.h"
#include "vec/core/types.h"
+#include "vec/data_types/data_type_string.h"
namespace doris::vectorized {
@@ -76,6 +77,9 @@ public:
/// Delete data for aggregation.
virtual void destroy(AggregateDataPtr __restrict place) const noexcept = 0;
+ virtual void destroy_vec(AggregateDataPtr __restrict place,
+ const size_t num_rows) const noexcept = 0;
+
/// Reset aggregation state
virtual void reset(AggregateDataPtr place) const = 0;
@@ -117,17 +121,29 @@ public:
virtual void serialize_vec(const std::vector<AggregateDataPtr>& places,
size_t offset,
BufferWritable& buf, const size_t num_rows)
const = 0;
+ virtual void serialize_to_column(const std::vector<AggregateDataPtr>&
places, size_t offset,
+ MutableColumnPtr& dst, const size_t
num_rows) const = 0;
+
+ virtual void serialize_without_key_to_column(ConstAggregateDataPtr
__restrict place,
+ MutableColumnPtr& dst) const
= 0;
+
/// Deserializes state. This function is called only for empty (just
created) states.
virtual void deserialize(AggregateDataPtr __restrict place,
BufferReadable& buf,
Arena* arena) const = 0;
- virtual void deserialize_vec(AggregateDataPtr places, ColumnString*
column, Arena* arena,
+ virtual void deserialize_vec(AggregateDataPtr places, const ColumnString*
column, Arena* arena,
size_t num_rows) const = 0;
+ virtual void deserialize_from_column(AggregateDataPtr places, const
IColumn& column,
+ Arena* arena, size_t num_rows) const
= 0;
+
/// Deserializes state and merge it with current aggregation function.
virtual void deserialize_and_merge(AggregateDataPtr __restrict place,
BufferReadable& buf,
Arena* arena) const = 0;
+ virtual void deserialize_and_merge_from_column(AggregateDataPtr __restrict
place,
+ const IColumn& column,
Arena* arena) const = 0;
+
/// Returns true if a function requires Arena to handle own states (see
add(), merge(), deserialize()).
virtual bool allocates_memory_in_arena() const { return false; }
@@ -169,9 +185,19 @@ public:
AggregateDataPtr place, const
IColumn** columns,
Arena* arena) const = 0;
+ virtual void streaming_agg_serialize(const IColumn** columns,
BufferWritable& buf,
+ const size_t num_rows, Arena* arena)
const = 0;
+
+ virtual void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows,
Arena* arena) const = 0;
+
const DataTypes& get_argument_types() const { return argument_types; }
const Array& get_parameters() const { return parameters; }
+ virtual MutableColumnPtr create_serialize_column() const { return
ColumnString::create(); }
+
+ virtual DataTypePtr get_serialized_type() const { return
std::make_shared<DataTypeString>(); }
+
protected:
DataTypes argument_types;
Array parameters;
@@ -184,6 +210,14 @@ public:
IAggregateFunctionHelper(const DataTypes& argument_types_, const Array&
parameters_)
: IAggregateFunction(argument_types_, parameters_) {}
+ void destroy_vec(AggregateDataPtr __restrict place,
+ const size_t num_rows) const noexcept override {
+ const size_t size_of_data_ = size_of_data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ static_cast<const Derived*>(this)->destroy(place + size_of_data_ *
i);
+ }
+ }
+
void add_batch(size_t batch_size, AggregateDataPtr* places, size_t
place_offset,
const IColumn** columns, Arena* arena, bool agg_many) const
override {
if constexpr (std::is_same_v<Derived,
AggregateFunctionBitmapCount<false, ColumnBitmap>> ||
@@ -266,7 +300,38 @@ public:
}
}
- void deserialize_vec(AggregateDataPtr places, ColumnString* column, Arena*
arena,
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ VectorBufferWriter writter(assert_cast<ColumnString&>(*dst));
+ serialize_vec(places, offset, writter, num_rows);
+ }
+
+ void streaming_agg_serialize(const IColumn** columns, BufferWritable& buf,
+ const size_t num_rows, Arena* arena) const
override {
+ char place[size_of_data()];
+ for (size_t i = 0; i != num_rows; ++i) {
+ static_cast<const Derived*>(this)->create(place);
+ static_cast<const Derived*>(this)->add(place, columns, i, arena);
+ static_cast<const Derived*>(this)->serialize(place, buf);
+ buf.commit();
+ static_cast<const Derived*>(this)->destroy(place);
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ VectorBufferWriter writter(static_cast<ColumnString&>(*dst));
+ streaming_agg_serialize(columns, writter, num_rows, arena);
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ MutableColumnPtr& dst) const override
{
+ VectorBufferWriter writter(static_cast<ColumnString&>(*dst));
+ static_cast<const Derived*>(this)->serialize(place, writter);
+ writter.commit();
+ }
+
+ void deserialize_vec(AggregateDataPtr places, const ColumnString* column,
Arena* arena,
size_t num_rows) const override {
const auto size_of_data = static_cast<const
Derived*>(this)->size_of_data();
for (size_t i = 0; i != num_rows; ++i) {
@@ -277,6 +342,11 @@ public:
}
}
+ void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
+ size_t num_rows) const override {
+ deserialize_vec(places, assert_cast<const ColumnString*>(&column),
arena, num_rows);
+ }
+
void merge_vec(const AggregateDataPtr* places, size_t offset,
ConstAggregateDataPtr rhs,
Arena* arena, const size_t num_rows) const override {
const auto size_of_data = static_cast<const
Derived*>(this)->size_of_data();
@@ -338,6 +408,16 @@ public:
derived->merge(place, deserialized_place, arena);
derived->destroy(deserialized_place);
}
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ size_t num_rows = column.size();
+ for (size_t i = 0; i != num_rows; ++i) {
+ VectorBufferReader buffer_reader(
+ (assert_cast<const ColumnString&>(column)).get_data_at(i));
+ deserialize_and_merge(place, buffer_reader, arena);
+ }
+ }
};
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_avg.h
b/be/src/vec/aggregate_functions/aggregate_function_avg.h
index eab938e4c1..c80c46b8f9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_avg.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_avg.h
@@ -22,8 +22,9 @@
#include "common/status.h"
#include "vec/aggregate_functions/aggregate_function.h"
-#include "vec/columns/columns_number.h"
+#include "vec/columns/column_fixed_length_object.h"
#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_fixed_length_object.h"
#include "vec/data_types/data_type_number.h"
#include "vec/io/io_helper.h"
@@ -34,6 +35,12 @@ struct AggregateFunctionAvgData {
T sum = 0;
UInt64 count = 0;
+ AggregateFunctionAvgData& operator=(const AggregateFunctionAvgData<T>&
src) {
+ sum = src.sum;
+ count = src.count;
+ return *this;
+ }
+
template <typename ResultT>
ResultT result() const {
if constexpr (std::is_floating_point_v<ResultT>) {
@@ -136,6 +143,69 @@ public:
column.get_data().push_back(this->data(place).template
result<ResultType>());
}
+ void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
+ size_t num_rows) const override {
+ auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
+ DCHECK(col.size() >= num_rows) << "source column's size should greater
than num_rows";
+ auto* data = col.get_data().data();
+ memcpy(places, data, sizeof(Data) * num_rows);
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
+ col.set_item_size(sizeof(Data));
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ *reinterpret_cast<Data*>(&data[sizeof(Data) * i]) =
+ *reinterpret_cast<Data*>(places[i] + offset);
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ auto* src_data = assert_cast<const
ColVecType&>(*columns[0]).get_data().data();
+ auto& dst_col = static_cast<ColumnFixedLengthObject&>(*dst);
+ dst_col.set_item_size(sizeof(Data));
+ dst_col.resize(num_rows);
+ auto* data = dst_col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ auto& state = *reinterpret_cast<Data*>(&data[sizeof(Data) * i]);
+ state.sum = src_data[i];
+ state.count = 1;
+ }
+ }
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ auto& col = assert_cast<const ColumnFixedLengthObject&>(column);
+ const size_t num_rows = column.size();
+ DCHECK(col.size() >= num_rows) << "source column's size should greater
than num_rows";
+ auto* data = reinterpret_cast<const Data*>(col.get_data().data());
+
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(place).sum += data[i].sum;
+ this->data(place).count += data[i].count;
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ MutableColumnPtr& dst) const override
{
+ auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
+ col.set_item_size(sizeof(Data));
+ col.resize(1);
+ *reinterpret_cast<Data*>(col.get_data().data()) = this->data(place);
+ }
+
+ MutableColumnPtr create_serialize_column() const override {
+ return ColumnFixedLengthObject::create(sizeof(Data));
+ }
+
+ DataTypePtr get_serialized_type() const override {
+ return std::make_shared<DataTypeFixedLengthObject>();
+ }
+
private:
UInt32 scale;
};
diff --git a/be/src/vec/aggregate_functions/aggregate_function_count.h
b/be/src/vec/aggregate_functions/aggregate_function_count.h
index e2ba3768e7..8ab6b53e56 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_count.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_count.h
@@ -71,6 +71,54 @@ public:
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
}
+
+ void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
+ size_t num_rows) const override {
+ auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+ auto* dst_data = reinterpret_cast<Data*>(places);
+ for (size_t i = 0; i != num_rows; ++i) {
+ dst_data[i].count = data[i];
+ }
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ auto& col = assert_cast<ColumnUInt64&>(*dst);
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ data[i] = this->data(places[i] + offset).count;
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ auto& col = assert_cast<ColumnUInt64&>(*dst);
+ col.resize(num_rows);
+ col.get_data().assign(num_rows, 1UL);
+ }
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+ const size_t num_rows = column.size();
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(place).count += data[i];
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ MutableColumnPtr& dst) const override
{
+ auto& col = assert_cast<ColumnUInt64&>(*dst);
+ col.resize(1);
+ reinterpret_cast<Data*>(col.get_data().data())->count =
this->data(place).count;
+ }
+
+ MutableColumnPtr create_serialize_column() const override {
+ return ColumnVector<UInt64>::create();
+ }
+
+ DataTypePtr get_serialized_type() const override { return
std::make_shared<DataTypeUInt64>(); }
};
/// Simply count number of not-NULL values.
@@ -117,6 +165,54 @@ public:
assert_cast<ColumnInt64&>(to).get_data().push_back(data(place).count);
}
}
+
+ void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
+ size_t num_rows) const override {
+ auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+ auto* dst_data = reinterpret_cast<Data*>(places);
+ for (size_t i = 0; i != num_rows; ++i) {
+ dst_data[i].count = data[i];
+ }
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ auto& col = assert_cast<ColumnUInt64&>(*dst);
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ data[i] = this->data(places[i] + offset).count;
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ auto& col = assert_cast<ColumnUInt64&>(*dst);
+ col.resize(num_rows);
+ col.get_data().assign(num_rows, 1UL);
+ }
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ auto data = assert_cast<const ColumnUInt64&>(column).get_data().data();
+ const size_t num_rows = column.size();
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(place).count += data[i];
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ MutableColumnPtr& dst) const override
{
+ auto& col = assert_cast<ColumnUInt64&>(*dst);
+ col.resize(1);
+ reinterpret_cast<Data*>(col.get_data().data())->count =
this->data(place).count;
+ }
+
+ MutableColumnPtr create_serialize_column() const override {
+ return ColumnVector<UInt64>::create();
+ }
+
+ DataTypePtr get_serialized_type() const override { return
std::make_shared<DataTypeUInt64>(); }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
index ee91c0bbb9..8fb36cddee 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_min_max.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_min_max.h
@@ -23,8 +23,10 @@
#include "common/logging.h"
#include "vec/aggregate_functions/aggregate_function.h"
#include "vec/columns/column_decimal.h"
+#include "vec/columns/column_fixed_length_object.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
+#include "vec/data_types/data_type_fixed_length_object.h"
#include "vec/io/io_helper.h"
namespace doris::vectorized {
@@ -40,8 +42,20 @@ private:
T value;
public:
+ SingleValueDataFixed() = default;
+ SingleValueDataFixed(bool has_value_, T value_) : has_value(has_value_),
value(value_) {}
bool has() const { return has_value; }
+ constexpr static bool IsFixedLength = true;
+ using value_type = T;
+
+ value_type get_value() const { return value; }
+
+ void set_value(T value_) {
+ has_value = true;
+ value = value_;
+ }
+
void insert_result_into(IColumn& to) const {
if (has()) {
assert_cast<ColumnVector<T>&>(to).get_data().push_back(value);
@@ -136,8 +150,20 @@ private:
Type value;
public:
+ SingleValueDataDecimal() = default;
+ SingleValueDataDecimal(bool has_value_, T value_) : has_value(has_value_),
value(value_) {}
bool has() const { return has_value; }
+ constexpr static bool IsFixedLength = true;
+ using value_type = Type;
+
+ value_type get_value() const { return value; }
+
+ void set_value(T value_) {
+ has_value = true;
+ value = value_;
+ }
+
void insert_result_into(IColumn& to) const {
if (has()) {
assert_cast<ColumnDecimal<T>&>(to).insert_data((const
char*)&value, 0);
@@ -242,6 +268,10 @@ private:
public:
~SingleValueDataString() { delete[] large_data; }
+ constexpr static bool IsFixedLength = false;
+
+ using value_type = String;
+
bool has() const { return size >= 0; }
const char* get_data() const { return size <= MAX_SMALL_STRING_SIZE ?
small_data : large_data; }
@@ -377,8 +407,9 @@ public:
};
template <typename Data>
-struct AggregateFunctionMaxData : Data {
+struct AggregateFunctionMaxData : public Data {
using Self = AggregateFunctionMaxData;
+ using Data::IsFixedLength;
bool change_if_better(const IColumn& column, size_t row_num, Arena* arena)
{
return this->change_if_greater(column, row_num, arena);
@@ -393,6 +424,7 @@ struct AggregateFunctionMaxData : Data {
template <typename Data>
struct AggregateFunctionMinData : Data {
using Self = AggregateFunctionMinData;
+ using Data::IsFixedLength;
bool change_if_better(const IColumn& column, size_t row_num, Arena* arena)
{
return this->change_if_less(column, row_num, arena);
@@ -408,6 +440,8 @@ class AggregateFunctionsSingleValue final
Data, AggregateFunctionsSingleValue<Data,
AllocatesMemoryInArena>> {
private:
DataTypePtr& type;
+ using Base = IAggregateFunctionDataHelper<
+ Data, AggregateFunctionsSingleValue<Data, AllocatesMemoryInArena>>;
public:
AggregateFunctionsSingleValue(const DataTypePtr& type_)
@@ -456,6 +490,91 @@ public:
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
this->data(place).insert_result_into(to);
}
+
+ void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
+ size_t num_rows) const override {
+ if constexpr (Data::IsFixedLength) {
+ const auto& col = static_cast<const
ColumnFixedLengthObject&>(column);
+ auto* column_data = reinterpret_cast<const
Data*>(col.get_data().data());
+ Data* data = reinterpret_cast<Data*>(places);
+ for (size_t i = 0; i != num_rows; ++i) {
+ data[i] = column_data[i];
+ }
+ } else {
+ Base::deserialize_from_column(places, column, arena, num_rows);
+ }
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ if constexpr (Data::IsFixedLength) {
+ auto& dst_column = static_cast<ColumnFixedLengthObject&>(*dst);
+ dst_column.resize(num_rows);
+ auto* dst_data =
reinterpret_cast<Data*>(dst_column.get_data().data());
+ for (size_t i = 0; i != num_rows; ++i) {
+ dst_data[i] = this->data(places[i] + offset);
+ }
+ } else {
+ Base::serialize_to_column(places, offset, dst, num_rows);
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ if constexpr (Data::IsFixedLength) {
+ const auto& src_column = static_cast<const
ColumnFixedLengthObject&>(*columns[0]);
+ auto* src_data = reinterpret_cast<const
Data*>(src_column.get_data().data());
+ auto& dst_column = static_cast<ColumnFixedLengthObject&>(*dst);
+ dst_column.resize(num_rows);
+ auto* dst_data =
reinterpret_cast<Data*>(dst_column.get_data().data());
+ for (size_t i = 0; i != num_rows; ++i) {
+ dst_data[i] = src_data[i];
+ }
+ } else {
+ Base::streaming_agg_serialize_to_column(columns, dst, num_rows,
arena);
+ }
+ }
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ if constexpr (Data::IsFixedLength) {
+ const auto& col = static_cast<const
ColumnFixedLengthObject&>(column);
+ auto* column_data = reinterpret_cast<const
Data*>(col.get_data().data());
+ const size_t num_rows = column.size();
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(place).change_if_better(column_data[i], arena);
+ }
+ } else {
+ Base::deserialize_and_merge_from_column(place, column, arena);
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ MutableColumnPtr& dst) const override
{
+ if constexpr (Data::IsFixedLength) {
+ auto& col = assert_cast<ColumnFixedLengthObject&>(*dst);
+ col.resize(1);
+ *reinterpret_cast<Data*>(col.get_data().data()) =
this->data(place);
+ } else {
+ Base::serialize_without_key_to_column(place, dst);
+ }
+ }
+
+ MutableColumnPtr create_serialize_column() const override {
+ if constexpr (Data::IsFixedLength) {
+ return ColumnFixedLengthObject::create(sizeof(Data));
+ } else {
+ return ColumnString::create();
+ }
+ }
+
+ DataTypePtr get_serialized_type() const override {
+ if constexpr (Data::IsFixedLength) {
+ return std::make_shared<DataTypeFixedLengthObject>();
+ } else {
+ return std::make_shared<DataTypeString>();
+ }
+ }
};
AggregateFunctionPtr create_aggregate_function_max(const std::string& name,
diff --git a/be/src/vec/aggregate_functions/aggregate_function_nothing.h
b/be/src/vec/aggregate_functions/aggregate_function_nothing.h
index 64af14a6cf..c6b310a9a5 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_nothing.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_nothing.h
@@ -67,6 +67,9 @@ public:
void deserialize_and_merge(AggregateDataPtr __restrict place,
BufferReadable& buf,
Arena* arena) const override {}
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {}
};
} // namespace doris::vectorized
diff --git a/be/src/vec/aggregate_functions/aggregate_function_null.h
b/be/src/vec/aggregate_functions/aggregate_function_null.h
index 89960bc9f0..7b8489c095 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_null.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_null.h
@@ -163,6 +163,16 @@ public:
}
}
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ size_t num_rows = column.size();
+ for (size_t i = 0; i != num_rows; ++i) {
+ VectorBufferReader buffer_reader(
+ (assert_cast<const ColumnString&>(column)).get_data_at(i));
+ deserialize_and_merge(place, buffer_reader, arena);
+ }
+ }
+
void insert_result_into(ConstAggregateDataPtr __restrict place, IColumn&
to) const override {
if constexpr (result_is_nullable) {
ColumnNullable& to_concrete = assert_cast<ColumnNullable&>(to);
diff --git a/be/src/vec/aggregate_functions/aggregate_function_sum.h
b/be/src/vec/aggregate_functions/aggregate_function_sum.h
index 9c5bdfc3bf..46ca85f3a9 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_sum.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_sum.h
@@ -101,6 +101,59 @@ public:
column.get_data().push_back(this->data(place).get());
}
+ void deserialize_from_column(AggregateDataPtr places, const IColumn&
column, Arena* arena,
+ size_t num_rows) const override {
+ auto data = assert_cast<const ColVecResult&>(column).get_data().data();
+ auto dst_data = reinterpret_cast<Data*>(places);
+ for (size_t i = 0; i != num_rows; ++i) {
+ dst_data[i].sum = data[i];
+ }
+ }
+
+ void serialize_to_column(const std::vector<AggregateDataPtr>& places,
size_t offset,
+ MutableColumnPtr& dst, const size_t num_rows)
const override {
+ auto& col = assert_cast<ColVecResult&>(*dst);
+ col.resize(num_rows);
+ auto* data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ data[i] = this->data(places[i] + offset).sum;
+ }
+ }
+
+ void streaming_agg_serialize_to_column(const IColumn** columns,
MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena) const override {
+ auto& col = assert_cast<ColVecResult&>(*dst);
+ auto& src = assert_cast<const ColVecType&>(*columns[0]);
+ col.resize(num_rows);
+ auto* src_data = src.get_data().data();
+ auto* dst_data = col.get_data().data();
+ for (size_t i = 0; i != num_rows; ++i) {
+ dst_data[i] = src_data[i];
+ }
+ }
+
+ void deserialize_and_merge_from_column(AggregateDataPtr __restrict place,
const IColumn& column,
+ Arena* arena) const override {
+ auto data = assert_cast<const ColVecResult&>(column).get_data().data();
+ const size_t num_rows = column.size();
+ for (size_t i = 0; i != num_rows; ++i) {
+ this->data(place).sum += data[i];
+ }
+ }
+
+ void serialize_without_key_to_column(ConstAggregateDataPtr __restrict
place,
+ MutableColumnPtr& dst) const override
{
+ auto& col = assert_cast<ColVecResult&>(*dst);
+ col.resize(1);
+ reinterpret_cast<Data*>(col.get_data().data())->sum =
this->data(place).sum;
+ }
+
+ MutableColumnPtr create_serialize_column() const override {
+ return get_return_type()->create_column();
+ }
+
+ DataTypePtr get_serialized_type() const override { return
get_return_type(); }
+
private:
UInt32 scale;
};
diff --git a/be/src/vec/columns/column_fixed_length_object.h
b/be/src/vec/columns/column_fixed_length_object.h
new file mode 100644
index 0000000000..5ecb13ec61
--- /dev/null
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -0,0 +1,197 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/columns/column.h"
+#include "vec/columns/columns_common.h"
+#include "vec/common/arena.h"
+#include "vec/common/pod_array.h"
+
+namespace doris::vectorized {
+
+class ColumnFixedLengthObject final : public COWHelper<IColumn,
ColumnFixedLengthObject> {
+private:
+ using Self = ColumnFixedLengthObject;
+ friend class COWHelper<IColumn, ColumnFixedLengthObject>;
+ friend class OlapBlockDataConvertor;
+
+public:
+ using Container = PaddedPODArray<uint8_t>;
+
+private:
+ ColumnFixedLengthObject() = delete;
+ ColumnFixedLengthObject(const size_t _item_size_) :
_item_size(_item_size_), _item_count(0) {}
+ ColumnFixedLengthObject(const ColumnFixedLengthObject& src)
+ : _item_size(src._item_size),
+ _item_count(src._item_count),
+ _data(src._data.begin(), src._data.end()) {}
+
+public:
+ const char* get_family_name() const override { return
"ColumnFixedLengthObject"; }
+
+ size_t size() const override { return _item_count; }
+
+ const Container& get_data() const { return _data; }
+
+ Container& get_data() { return _data; }
+
+ void resize(size_t n) override {
+ DCHECK(_item_size > 0) << "_item_size should be greater than 0";
+ _data.resize(n * _item_size);
+ _item_count = n;
+ }
+
+ MutableColumnPtr clone_resized(size_t size) const override {
+ auto res = this->create(_item_size);
+
+ if (size > 0) {
+ auto& new_col = static_cast<Self&>(*res);
+ new_col.resize(size);
+ auto* new_data = new_col._data.data();
+
+ size_t count = std::min(this->size(), size);
+ memcpy(new_data, _data.data(), count * _item_size);
+
+ if (size > count) memset(new_data + count * _item_size, 0, (size -
count) * _item_size);
+ }
+
+ return res;
+ }
+
+ void insert_indices_from(const IColumn& src, const int* indices_begin,
+ const int* indices_end) override {
+ const Self& src_vec = static_cast<const Self&>(src);
+ auto origin_size = size();
+ auto new_size = indices_end - indices_begin;
+ if (_item_size == 0) {
+ _item_size = src_vec._item_size;
+ }
+ DCHECK(_item_size == src_vec._item_size) << "dst and src should have
the same _item_size";
+ resize(origin_size + new_size);
+
+ for (int i = 0; i < new_size; ++i) {
+ int offset = indices_begin[i];
+ if (offset > -1) {
+ memcpy(&_data[(origin_size + i) * _item_size],
&src_vec._data[offset * _item_size],
+ _item_size);
+ } else {
+ memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
+ }
+ }
+ }
+
+ void clear() override {
+ _data.clear();
+ _item_count = 0;
+ }
+
+ [[noreturn]] Field operator[](size_t n) const override {
+ LOG(FATAL) << "operator[] not supported";
+ }
+
+ void get(size_t n, Field& res) const override { LOG(FATAL) << "get not
supported"; }
+
+ [[noreturn]] StringRef get_data_at(size_t n) const override {
+ LOG(FATAL) << "get_data_at not supported";
+ }
+
+ void insert(const Field& x) override { LOG(FATAL) << "insert not
supported"; }
+
+ void insert_range_from(const IColumn& src, size_t start, size_t length)
override {
+ LOG(FATAL) << "insert_range_from not supported";
+ }
+
+ void insert_data(const char* pos, size_t length) override {
+ LOG(FATAL) << "insert_data not supported";
+ }
+
+ void insert_default() override { LOG(FATAL) << "insert_default not
supported"; }
+
+ void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported";
}
+
+ StringRef serialize_value_into_arena(size_t n, Arena& arena,
+ char const*& begin) const override {
+ LOG(FATAL) << "serialize_value_into_arena not supported";
+ }
+
+ const char* deserialize_and_insert_from_arena(const char* pos) override {
+ LOG(FATAL) << "deserialize_and_insert_from_arena not supported";
+ }
+
+ void update_hash_with_value(size_t n, SipHash& hash) const override {
+ LOG(FATAL) << "update_hash_with_value not supported";
+ }
+
+ [[noreturn]] ColumnPtr filter(const IColumn::Filter& filt,
+ ssize_t result_size_hint) const override {
+ LOG(FATAL) << "filter not supported";
+ }
+
+ [[noreturn]] ColumnPtr permute(const IColumn::Permutation& perm, size_t
limit) const override {
+ LOG(FATAL) << "permute not supported";
+ }
+
+ [[noreturn]] int compare_at(size_t n, size_t m, const IColumn& rhs,
+ int nan_direction_hint) const override {
+ LOG(FATAL) << "compare_at not supported";
+ }
+
+ void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
+ IColumn::Permutation& res) const override {
+ LOG(FATAL) << "get_permutation not supported";
+ }
+
+ [[noreturn]] ColumnPtr replicate(const IColumn::Offsets& offsets) const
override {
+ LOG(FATAL) << "replicate not supported";
+ }
+
+ [[noreturn]] MutableColumns scatter(IColumn::ColumnIndex num_columns,
+ const IColumn::Selector& selector)
const override {
+ LOG(FATAL) << "scatter not supported";
+ }
+
+ void get_extremes(Field& min, Field& max) const override {
+ LOG(FATAL) << "get_extremes not supported";
+ }
+
+ size_t byte_size() const override { return _data.size(); }
+
+ size_t item_size() const { return _item_size; }
+
+ void set_item_size(size_t size) {
+ DCHECK(_item_count == 0 || size == _item_size)
+ << "cannot reset _item_size of ColumnFixedLengthObject";
+ _item_size = size;
+ }
+
+ size_t allocated_bytes() const override { return _data.allocated_bytes(); }
+
+ void replace_column_data(const IColumn&, size_t row, size_t self_row = 0)
override {
+ LOG(FATAL) << "replace_column_data not supported";
+ }
+
+ void replace_column_data_default(size_t self_row = 0) override {
+ LOG(FATAL) << "replace_column_data_default not supported";
+ }
+
+protected:
+ size_t _item_size;
+ size_t _item_count;
+ Container _data;
+};
+} // namespace doris::vectorized
diff --git a/be/src/vec/core/field.h b/be/src/vec/core/field.h
index cddd7037d7..4c4a275e34 100644
--- a/be/src/vec/core/field.h
+++ b/be/src/vec/core/field.h
@@ -182,6 +182,7 @@ public:
Float64 = 3,
UInt128 = 4,
Int128 = 5,
+ FixedLengthObject = 6,
/// Non-POD types.
@@ -224,6 +225,8 @@ public:
return "Decimal128";
case AggregateFunctionState:
return "AggregateFunctionState";
+ case FixedLengthObject:
+ return "FixedLengthObject";
}
LOG(FATAL) << "Bad type of Field";
@@ -378,6 +381,8 @@ public:
return get<DecimalField<Decimal128>>() <
rhs.get<DecimalField<Decimal128>>();
case Types::AggregateFunctionState:
return get<AggregateFunctionStateData>() <
rhs.get<AggregateFunctionStateData>();
+ case Types::FixedLengthObject:
+ break;
}
LOG(FATAL) << "Bad type of Field";
@@ -417,6 +422,8 @@ public:
return get<DecimalField<Decimal128>>() <=
rhs.get<DecimalField<Decimal128>>();
case Types::AggregateFunctionState:
return get<AggregateFunctionStateData>() <=
rhs.get<AggregateFunctionStateData>();
+ case Types::FixedLengthObject:
+ break;
}
LOG(FATAL) << "Bad type of Field";
return {};
@@ -452,6 +459,8 @@ public:
return get<DecimalField<Decimal128>>() ==
rhs.get<DecimalField<Decimal128>>();
case Types::AggregateFunctionState:
return get<AggregateFunctionStateData>() ==
rhs.get<AggregateFunctionStateData>();
+ case Types::FixedLengthObject:
+ break;
}
CHECK(false) << "Bad type of Field";
@@ -544,6 +553,9 @@ private:
case Types::AggregateFunctionState:
f(field.template get<AggregateFunctionStateData>());
return;
+ case Types::FixedLengthObject:
+ LOG(FATAL) << "FixedLengthObject not supported";
+ break;
}
}
diff --git a/be/src/vec/core/types.h b/be/src/vec/core/types.h
index def50d5e01..f57f41d58a 100644
--- a/be/src/vec/core/types.h
+++ b/be/src/vec/core/types.h
@@ -77,6 +77,7 @@ enum class TypeIndex {
DateV2,
DateTimeV2,
TimeV2,
+ FixedLengthObject,
};
struct Consted {
@@ -448,6 +449,8 @@ inline const char* getTypeName(TypeIndex idx) {
return TypeName<BitmapValue>::get();
case TypeIndex::HLL:
return TypeName<HyperLogLog>::get();
+ case TypeIndex::FixedLengthObject:
+ return "FixedLengthObject";
}
__builtin_unreachable();
diff --git a/be/src/vec/data_types/data_type.cpp
b/be/src/vec/data_types/data_type.cpp
index f04bd0c592..f5a771131a 100644
--- a/be/src/vec/data_types/data_type.cpp
+++ b/be/src/vec/data_types/data_type.cpp
@@ -143,6 +143,8 @@ PGenericType_TypeId IDataType::get_pdata_type(const
IDataType* data_type) {
return PGenericType::HLL;
case TypeIndex::Array:
return PGenericType::LIST;
+ case TypeIndex::FixedLengthObject:
+ return PGenericType::FIXEDLENGTHOBJECT;
default:
return PGenericType::UNKNOWN;
}
diff --git a/be/src/vec/data_types/data_type_factory.cpp
b/be/src/vec/data_types/data_type_factory.cpp
index e2d1e9d9f3..980bd547d7 100644
--- a/be/src/vec/data_types/data_type_factory.cpp
+++ b/be/src/vec/data_types/data_type_factory.cpp
@@ -280,6 +280,9 @@ DataTypePtr DataTypeFactory::create_data_type(const
PColumnMeta& pcolumn) {
DCHECK(pcolumn.children_size() == 1);
nested =
std::make_shared<DataTypeArray>(create_data_type(pcolumn.children(0)));
break;
+ case PGenericType::FIXEDLENGTHOBJECT:
+ nested = std::make_shared<DataTypeFixedLengthObject>();
+ break;
default: {
LOG(FATAL) << fmt::format("Unknown data type: {}", pcolumn.type());
return nullptr;
diff --git a/be/src/vec/data_types/data_type_factory.hpp
b/be/src/vec/data_types/data_type_factory.hpp
index 37dda1ee54..73e0a147fe 100644
--- a/be/src/vec/data_types/data_type_factory.hpp
+++ b/be/src/vec/data_types/data_type_factory.hpp
@@ -34,6 +34,7 @@
#include "vec/data_types/data_type_date.h"
#include "vec/data_types/data_type_date_time.h"
#include "vec/data_types/data_type_decimal.h"
+#include "vec/data_types/data_type_fixed_length_object.h"
#include "vec/data_types/data_type_nothing.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_number.h"
diff --git a/be/src/vec/data_types/data_type_fixed_length_object.cpp
b/be/src/vec/data_types/data_type_fixed_length_object.cpp
new file mode 100644
index 0000000000..d96945b2cd
--- /dev/null
+++ b/be/src/vec/data_types/data_type_fixed_length_object.cpp
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/data_types/data_type_fixed_length_object.h"
+
+#include "vec/aggregate_functions/aggregate_function_avg.h"
+
+namespace doris::vectorized {
+
+char* DataTypeFixedLengthObject::serialize(const IColumn& column, char* buf)
const {
+ // row num
+ const auto row_num = column.size();
+ *reinterpret_cast<uint32_t*>(buf) = row_num;
+ buf += sizeof(uint32_t);
+ // column data
+ auto ptr = column.convert_to_full_column_if_const();
+ const auto& src_col = assert_cast<const ColumnType&>(*ptr.get());
+ DCHECK(src_col.item_size() > 0)
+ << "[serialize]item size of DataTypeFixedLengthObject should be
greater than 0";
+ *reinterpret_cast<size_t*>(buf) = src_col.item_size();
+ buf += sizeof(size_t);
+ const auto* origin_data = src_col.get_data().data();
+ memcpy(buf, origin_data, row_num * src_col.item_size());
+ buf += row_num * src_col.item_size();
+
+ return buf;
+}
+
+const char* DataTypeFixedLengthObject::deserialize(const char* buf, IColumn*
column) const {
+ // row num
+ uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf);
+ buf += sizeof(uint32_t);
+ size_t item_size = *reinterpret_cast<const size_t*>(buf);
+ buf += sizeof(size_t);
+
+ DCHECK(item_size > 0)
+ << "[deserialize]item size of DataTypeFixedLengthObject should be
greater than 0";
+
+ auto& dst_col = static_cast<ColumnType&>(*column);
+ dst_col.set_item_size(item_size);
+ // column data
+ dst_col.resize(row_num);
+ memcpy(dst_col.get_data().data(), buf, row_num * item_size);
+ buf += row_num * item_size;
+
+ return buf;
+}
+
+MutableColumnPtr DataTypeFixedLengthObject::create_column() const {
+ return ColumnType::create(0);
+}
+
+} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/data_types/data_type_fixed_length_object.h
b/be/src/vec/data_types/data_type_fixed_length_object.h
new file mode 100644
index 0000000000..5d346b297d
--- /dev/null
+++ b/be/src/vec/data_types/data_type_fixed_length_object.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "vec/columns/column_fixed_length_object.h"
+#include "vec/common/typeid_cast.h"
+#include "vec/data_types/data_type.h"
+
+namespace doris::vectorized {
+
+class DataTypeFixedLengthObject final : public IDataType {
+public:
+ using ColumnType = ColumnFixedLengthObject;
+
+ DataTypeFixedLengthObject() {}
+
+ DataTypeFixedLengthObject(const DataTypeFixedLengthObject& other) {}
+
+ const char* get_family_name() const override { return
"DataTypeFixedLengthObject"; }
+
+ TypeIndex get_type_id() const override { return
TypeIndex::FixedLengthObject; }
+
+ Field get_default() const override { return String(); }
+
+ bool equals(const IDataType& rhs) const override { return typeid(rhs) ==
typeid(*this); }
+
+ int64_t get_uncompressed_serialized_bytes(const IColumn& column) const
override {
+ return static_cast<const ColumnType&>(column).byte_size() +
sizeof(uint32_t) +
+ sizeof(size_t);
+ }
+
+ char* serialize(const IColumn& column, char* buf) const override;
+ const char* deserialize(const char* buf, IColumn* column) const override;
+
+ MutableColumnPtr create_column() const override;
+
+ bool get_is_parametric() const override { return false; }
+ bool have_subtypes() const override { return false; }
+
+ bool is_categorial() const override { return
is_value_represented_by_integer(); }
+ bool can_be_inside_low_cardinality() const override { return false; }
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vaggregation_node.cpp
b/be/src/vec/exec/vaggregation_node.cpp
index d43ae88e8b..e31f673643 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -103,6 +103,9 @@ AggregationNode::AggregationNode(ObjectPool* pool, const
TPlanNode& tnode,
}
_is_first_phase = tnode.agg_node.__isset.is_first_phase &&
tnode.agg_node.is_first_phase;
+ _use_fixed_length_serialization_opt =
+ tnode.agg_node.__isset.use_fixed_length_serialization_opt &&
+ tnode.agg_node.use_fixed_length_serialization_opt;
}
AggregationNode::~AggregationNode() = default;
@@ -275,11 +278,14 @@ Status AggregationNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
_build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
- _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTimer");
+ _serialize_key_timer = ADD_TIMER(runtime_profile(), "SerializeKeyTime");
_exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
_merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
_expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
_get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
+ _serialize_data_timer = ADD_TIMER(runtime_profile(), "SerializeDataTime");
+ _deserialize_data_timer = ADD_TIMER(runtime_profile(),
"DeserializeDataTime");
+
_data_mem_tracker = std::make_unique<MemTracker>("AggregationNode:Data");
_intermediate_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc =
state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
@@ -565,20 +571,34 @@ Status
AggregationNode::_serialize_without_key(RuntimeState* state, Block* block
MutableColumns value_columns(agg_size);
std::vector<DataTypePtr> data_types(agg_size);
-
// will serialize data to string column
- std::vector<VectorBufferWriter> value_buffer_writers;
- auto serialize_string_type = std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- data_types[i] = serialize_string_type;
- value_columns[i] = serialize_string_type->create_column();
-
value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get()));
- }
+ if (_use_fixed_length_serialization_opt) {
+ auto serialize_string_type = std::make_shared<DataTypeString>();
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ data_types[i] =
_aggregate_evaluators[i]->function()->get_serialized_type();
+ value_columns[i] =
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize(
- _agg_data.without_key + _offsets_of_aggregate_states[i],
value_buffer_writers[i]);
- value_buffer_writers[i].commit();
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+
_aggregate_evaluators[i]->function()->serialize_without_key_to_column(
+ _agg_data.without_key + _offsets_of_aggregate_states[i],
value_columns[i]);
+ }
+ } else {
+ std::vector<VectorBufferWriter> value_buffer_writers;
+ auto serialize_string_type = std::make_shared<DataTypeString>();
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ data_types[i] = serialize_string_type;
+ value_columns[i] = serialize_string_type->create_column();
+ value_buffer_writers.emplace_back(
+ *reinterpret_cast<ColumnString*>(value_columns[i].get()));
+ }
+
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->function()->serialize(
+ _agg_data.without_key + _offsets_of_aggregate_states[i],
+ value_buffer_writers[i]);
+ value_buffer_writers[i].commit();
+ }
}
{
ColumnsWithTypeAndName data_with_schema;
@@ -607,8 +627,6 @@ Status AggregationNode::_execute_without_key(Block* block) {
Status AggregationNode::_merge_without_key(Block* block) {
SCOPED_TIMER(_merge_timer);
DCHECK(_agg_data.without_key != nullptr);
- std::unique_ptr<char[]> deserialize_buffer(new
char[_total_size_of_aggregate_states]);
- int rows = block->rows();
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
if (_aggregate_evaluators[i]->is_merge()) {
int col_id = _get_slot_column_id(_aggregate_evaluators[i]);
@@ -617,12 +635,21 @@ Status AggregationNode::_merge_without_key(Block* block) {
column =
((ColumnNullable*)column.get())->get_nested_column_ptr();
}
- for (int j = 0; j < rows; ++j) {
- VectorBufferReader
buffer_reader(((ColumnString*)(column.get()))->get_data_at(j));
-
- _aggregate_evaluators[i]->function()->deserialize_and_merge(
- _agg_data.without_key +
_offsets_of_aggregate_states[i], buffer_reader,
+ SCOPED_TIMER(_deserialize_data_timer);
+ if (_use_fixed_length_serialization_opt) {
+
_aggregate_evaluators[i]->function()->deserialize_and_merge_from_column(
+ _agg_data.without_key +
_offsets_of_aggregate_states[i], *column,
&_agg_arena_pool);
+ } else {
+ const int rows = block->rows();
+ for (int j = 0; j < rows; ++j) {
+ VectorBufferReader buffer_reader(
+ ((ColumnString*)(column.get()))->get_data_at(j));
+
+
_aggregate_evaluators[i]->function()->deserialize_and_merge(
+ _agg_data.without_key +
_offsets_of_aggregate_states[i], buffer_reader,
+ &_agg_arena_pool);
+ }
}
} else {
_aggregate_evaluators[i]->execute_single_add(
@@ -872,53 +899,59 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
// do not try to do agg, just init and serialize directly
return the out_block
if (!_should_expand_preagg_hash_tables()) {
ret_flag = true;
- if (_streaming_pre_places.size() < rows) {
- _streaming_pre_places.reserve(rows);
- for (size_t i = _streaming_pre_places.size(); i <
rows; ++i) {
-
_streaming_pre_places.emplace_back(_agg_arena_pool.aligned_alloc(
- _total_size_of_aggregate_states,
_align_aggregate_states));
- }
- }
-
- for (size_t i = 0; i < rows; ++i) {
- _create_agg_status(_streaming_pre_places[i]);
- }
-
- for (int i = 0; i < _aggregate_evaluators.size(); ++i)
{
- _aggregate_evaluators[i]->execute_batch_add(
- in_block, _offsets_of_aggregate_states[i],
- _streaming_pre_places.data(),
&_agg_arena_pool, false);
- }
// will serialize value data to string column
- std::vector<VectorBufferWriter> value_buffer_writers;
bool mem_reuse = out_block->mem_reuse();
- auto serialize_string_type =
std::make_shared<DataTypeString>();
+
+ std::vector<DataTypePtr> data_types;
MutableColumns value_columns;
- for (int i = 0; i < _aggregate_evaluators.size(); ++i)
{
- if (mem_reuse) {
- value_columns.emplace_back(
-
std::move(*out_block->get_by_position(i + key_size).column)
- .mutate());
- } else {
- // slot type of value it should always be
string type
-
value_columns.emplace_back(serialize_string_type->create_column());
+ if (_use_fixed_length_serialization_opt) {
+ for (int i = 0; i < _aggregate_evaluators.size();
++i) {
+ auto data_type =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ if (mem_reuse) {
+ value_columns.emplace_back(
+
std::move(*out_block->get_by_position(i + key_size)
+ .column)
+ .mutate());
+ } else {
+ // slot type of value it should always be
string type
+
value_columns.emplace_back(_aggregate_evaluators[i]
+
->function()
+
->create_serialize_column());
+ }
+ data_types.emplace_back(data_type);
}
- value_buffer_writers.emplace_back(
-
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
- }
- for (size_t j = 0; j < rows; ++j) {
- for (size_t i = 0; i <
_aggregate_evaluators.size(); ++i) {
-
_aggregate_evaluators[i]->function()->serialize(
- _streaming_pre_places[j] +
_offsets_of_aggregate_states[i],
- value_buffer_writers[i]);
- value_buffer_writers[i].commit();
+ for (int i = 0; i != _aggregate_evaluators.size();
++i) {
+ SCOPED_TIMER(_serialize_data_timer);
+
_aggregate_evaluators[i]->streaming_agg_serialize_to_column(
+ in_block, value_columns[i], rows,
&_agg_arena_pool);
+ }
+ } else {
+ std::vector<VectorBufferWriter>
value_buffer_writers;
+ auto serialize_string_type =
std::make_shared<DataTypeString>();
+ for (int i = 0; i < _aggregate_evaluators.size();
++i) {
+ if (mem_reuse) {
+ value_columns.emplace_back(
+
std::move(*out_block->get_by_position(i + key_size)
+ .column)
+ .mutate());
+ } else {
+ // slot type of value it should always be
string type
+ value_columns.emplace_back(
+
serialize_string_type->create_column());
+ }
+ data_types.emplace_back(serialize_string_type);
+ value_buffer_writers.emplace_back(
+
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
}
- }
- for (size_t i = 0; i < rows; ++i) {
- _destroy_agg_status(_streaming_pre_places[i]);
+ for (int i = 0; i != _aggregate_evaluators.size();
++i) {
+ SCOPED_TIMER(_serialize_data_timer);
+
_aggregate_evaluators[i]->streaming_agg_serialize(
+ in_block, value_buffer_writers[i],
rows, &_agg_arena_pool);
+ }
}
if (!mem_reuse) {
@@ -931,7 +964,7 @@ Status
AggregationNode::_pre_agg_with_serialized_key(doris::vectorized::Block* i
}
for (int i = 0; i < value_columns.size(); ++i) {
columns_with_schema.emplace_back(std::move(value_columns[i]),
-
serialize_string_type, "");
+
data_types[i], "");
}
out_block->swap(Block(columns_with_schema));
} else {
@@ -1073,19 +1106,6 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
}
}
- // will serialize data to string column
- std::vector<VectorBufferWriter> value_buffer_writers;
- auto serialize_string_type = std::make_shared<DataTypeString>();
- for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
- value_data_types[i] = serialize_string_type;
- if (mem_reuse) {
- value_columns[i] = std::move(*block->get_by_position(i +
key_size).column).mutate();
- } else {
- value_columns[i] = serialize_string_type->create_column();
- }
-
value_buffer_writers.emplace_back(*reinterpret_cast<ColumnString*>(value_columns[i].get()));
- }
-
std::visit(
[&](auto&& agg_method) -> void {
agg_method.init_once();
@@ -1095,7 +1115,7 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
const auto size = std::min(data.size(),
size_t(state->batch_size()));
using KeyType = std::decay_t<decltype(iter->get_first())>;
std::vector<KeyType> keys(size);
- std::vector<AggregateDataPtr> values(size);
+ std::vector<AggregateDataPtr> values(size + 1);
size_t num_rows = 0;
while (iter != data.end() && num_rows < state->batch_size()) {
@@ -1107,31 +1127,60 @@ Status
AggregationNode::_serialize_with_serialized_key_result(RuntimeState* stat
agg_method.insert_keys_into_columns(keys, key_columns,
num_rows, _probe_key_sz);
- for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
- _aggregate_evaluators[i]->function()->serialize_vec(
- values, _offsets_of_aggregate_states[i],
value_buffer_writers[i],
- num_rows);
- }
-
if (iter == data.end()) {
if (agg_method.data.has_null_key_data()) {
DCHECK(key_columns.size() == 1);
DCHECK(key_columns[0]->is_nullable());
if (agg_method.data.has_null_key_data()) {
key_columns[0]->insert_data(nullptr, 0);
- auto mapped = agg_method.data.get_null_key_data();
- for (size_t i = 0; i <
_aggregate_evaluators.size(); ++i) {
-
_aggregate_evaluators[i]->function()->serialize(
- mapped +
_offsets_of_aggregate_states[i],
- value_buffer_writers[i]);
- value_buffer_writers[i].commit();
- }
+ values[num_rows] =
agg_method.data.get_null_key_data();
+ ++num_rows;
*eos = true;
}
} else {
*eos = true;
}
}
+
+ if (_use_fixed_length_serialization_opt) {
+ SCOPED_TIMER(_serialize_data_timer);
+ for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ value_data_types[i] =
+
_aggregate_evaluators[i]->function()->get_serialized_type();
+ if (mem_reuse) {
+ value_columns[i] =
+ std::move(*block->get_by_position(i +
key_size).column)
+ .mutate();
+ } else {
+ value_columns[i] =
+
_aggregate_evaluators[i]->function()->create_serialize_column();
+ }
+
_aggregate_evaluators[i]->function()->serialize_to_column(
+ values, _offsets_of_aggregate_states[i],
value_columns[i],
+ num_rows);
+ }
+ } else {
+ SCOPED_TIMER(_serialize_data_timer);
+ std::vector<VectorBufferWriter> value_buffer_writers;
+ auto serialize_string_type =
std::make_shared<DataTypeString>();
+ for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
+ value_data_types[i] = serialize_string_type;
+ if (mem_reuse) {
+ value_columns[i] =
+ std::move(*block->get_by_position(i +
key_size).column)
+ .mutate();
+ } else {
+ value_columns[i] =
serialize_string_type->create_column();
+ }
+ value_buffer_writers.emplace_back(
+
*reinterpret_cast<ColumnString*>(value_columns[i].get()));
+ }
+ for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) {
+ _aggregate_evaluators[i]->function()->serialize_vec(
+ values, _offsets_of_aggregate_states[i],
value_buffer_writers[i],
+ num_rows);
+ }
+ }
},
_agg_data._aggregated_method_variant);
diff --git a/be/src/vec/exec/vaggregation_node.h
b/be/src/vec/exec/vaggregation_node.h
index 6960c89c4f..687739dd07 100644
--- a/be/src/vec/exec/vaggregation_node.h
+++ b/be/src/vec/exec/vaggregation_node.h
@@ -649,6 +649,7 @@ private:
bool _needs_finalize;
bool _is_merge;
bool _is_first_phase;
+ bool _use_fixed_length_serialization_opt;
std::unique_ptr<MemPool> _mem_pool;
std::unique_ptr<MemTracker> _data_mem_tracker;
@@ -669,11 +670,12 @@ private:
RuntimeProfile::Counter* _merge_timer;
RuntimeProfile::Counter* _expr_timer;
RuntimeProfile::Counter* _get_results_timer;
+ RuntimeProfile::Counter* _serialize_data_timer;
+ RuntimeProfile::Counter* _deserialize_data_timer;
bool _is_streaming_preagg;
Block _preagg_block = Block();
bool _should_expand_hash_table = true;
- std::vector<char*> _streaming_pre_places;
bool _should_limit_output = false;
bool _reach_limit = false;
@@ -802,13 +804,23 @@ private:
std::unique_ptr<char[]> deserialize_buffer(
new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
- _aggregate_evaluators[i]->function()->deserialize_vec(
- deserialize_buffer.get(),
(ColumnString*)(column.get()),
- &_agg_arena_pool, rows);
+ if (_use_fixed_length_serialization_opt) {
+ SCOPED_TIMER(_deserialize_data_timer);
+
_aggregate_evaluators[i]->function()->deserialize_from_column(
+ deserialize_buffer.get(), *column,
&_agg_arena_pool, rows);
+ } else {
+ SCOPED_TIMER(_deserialize_data_timer);
+ _aggregate_evaluators[i]->function()->deserialize_vec(
+ deserialize_buffer.get(),
(ColumnString*)(column.get()),
+ &_agg_arena_pool, rows);
+ }
_aggregate_evaluators[i]->function()->merge_vec_selected(
places.data(), _offsets_of_aggregate_states[i],
deserialize_buffer.get(), &_agg_arena_pool, rows);
+
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+ rows);
+
} else {
_aggregate_evaluators[i]->execute_batch_add_selected(
block, _offsets_of_aggregate_states[i],
places.data(),
@@ -829,13 +841,23 @@ private:
std::unique_ptr<char[]> deserialize_buffer(
new
char[_aggregate_evaluators[i]->function()->size_of_data() * rows]);
- _aggregate_evaluators[i]->function()->deserialize_vec(
- deserialize_buffer.get(),
(ColumnString*)(column.get()),
- &_agg_arena_pool, rows);
+ if (_use_fixed_length_serialization_opt) {
+ SCOPED_TIMER(_deserialize_data_timer);
+
_aggregate_evaluators[i]->function()->deserialize_from_column(
+ deserialize_buffer.get(), *column,
&_agg_arena_pool, rows);
+ } else {
+ SCOPED_TIMER(_deserialize_data_timer);
+ _aggregate_evaluators[i]->function()->deserialize_vec(
+ deserialize_buffer.get(),
(ColumnString*)(column.get()),
+ &_agg_arena_pool, rows);
+ }
_aggregate_evaluators[i]->function()->merge_vec(
places.data(), _offsets_of_aggregate_states[i],
deserialize_buffer.get(), &_agg_arena_pool, rows);
+
_aggregate_evaluators[i]->function()->destroy_vec(deserialize_buffer.get(),
+ rows);
+
} else {
_aggregate_evaluators[i]->execute_batch_add(block,
_offsets_of_aggregate_states[i],
diff --git a/be/src/vec/exprs/vectorized_agg_fn.cpp
b/be/src/vec/exprs/vectorized_agg_fn.cpp
index 2631165777..28433f7b85 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.cpp
+++ b/be/src/vec/exprs/vectorized_agg_fn.cpp
@@ -163,6 +163,20 @@ void AggFnEvaluator::execute_batch_add_selected(Block*
block, size_t offset,
_function->add_batch_selected(block->rows(), places, offset,
_agg_columns.data(), arena);
}
+void AggFnEvaluator::streaming_agg_serialize(Block* block, BufferWritable& buf,
+ const size_t num_rows, Arena*
arena) {
+ _calc_argment_columns(block);
+ SCOPED_TIMER(_exec_timer);
+ _function->streaming_agg_serialize(_agg_columns.data(), buf, num_rows,
arena);
+}
+
+void AggFnEvaluator::streaming_agg_serialize_to_column(Block* block,
MutableColumnPtr& dst,
+ const size_t num_rows,
Arena* arena) {
+ _calc_argment_columns(block);
+ SCOPED_TIMER(_exec_timer);
+ _function->streaming_agg_serialize_to_column(_agg_columns.data(), dst,
num_rows, arena);
+}
+
void AggFnEvaluator::insert_result_info(AggregateDataPtr place, IColumn*
column) {
_function->insert_result_into(place, *column);
}
diff --git a/be/src/vec/exprs/vectorized_agg_fn.h
b/be/src/vec/exprs/vectorized_agg_fn.h
index 2185de7617..fa025edec1 100644
--- a/be/src/vec/exprs/vectorized_agg_fn.h
+++ b/be/src/vec/exprs/vectorized_agg_fn.h
@@ -61,6 +61,12 @@ public:
void execute_batch_add_selected(Block* block, size_t offset,
AggregateDataPtr* places,
Arena* arena = nullptr);
+ void streaming_agg_serialize(Block* block, BufferWritable& buf, const
size_t num_rows,
+ Arena* arena);
+
+ void streaming_agg_serialize_to_column(Block* block, MutableColumnPtr& dst,
+ const size_t num_rows, Arena*
arena);
+
void insert_result_info(AggregateDataPtr place, IColumn* column);
void insert_result_info_vec(const std::vector<AggregateDataPtr>& place,
size_t offset,
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index c8c89a6fa2..aca046c7a1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -273,6 +273,7 @@ public class AggregationNode extends PlanNode {
msg.agg_node.setAggSortInfos(aggSortInfos);
msg.agg_node.setUseStreamingPreaggregation(useStreamingPreagg);
msg.agg_node.setIsFirstPhase(aggInfo.isFirstPhase());
+ msg.agg_node.setUseFixedLengthSerializationOpt(true);
List<Expr> groupingExprs = aggInfo.getGroupingExprs();
if (groupingExprs != null) {
msg.agg_node.setGroupingExprs(Expr.treesToThrift(groupingExprs));
diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto
index c5f47d2ef9..2cb44154cf 100644
--- a/gensrc/proto/types.proto
+++ b/gensrc/proto/types.proto
@@ -101,6 +101,7 @@ message PGenericType {
NOTHING = 27;
DATEV2 = 28;
DATETIMEV2 = 29;
+ FIXEDLENGTHOBJECT = 30;
UNKNOWN = 999;
}
required TypeId id = 2;
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index bbe1e00c20..b214eeb496 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -561,7 +561,8 @@ struct TAggregationNode {
5: required bool need_finalize
6: optional bool use_streaming_preaggregation
7: optional list<TSortInfo> agg_sort_infos
- 8: optional bool is_first_phase;
+ 8: optional bool is_first_phase
+ 9: optional bool use_fixed_length_serialization_opt
}
struct TRepeatNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]