This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch tpch500 in repository https://gitbox.apache.org/repos/asf/doris.git
commit e32fc54502525a418ea3fa87d8e9ad6b4f22aa3b Author: HappenLee <happen...@hotmail.com> AuthorDate: Thu Dec 28 16:55:51 2023 +0800 refactor the serlize code support decimal --- be/cmake/thirdparty.cmake | 1 + be/src/vec/core/block.cpp | 55 +++++----- be/src/vec/data_types/data_type.h | 1 + be/src/vec/data_types/data_type_decimal.cpp | 54 ++++++--- be/src/vec/data_types/data_type_nullable.cpp | 45 ++++++-- be/src/vec/data_types/data_type_number_base.cpp | 45 ++++++-- be/src/vec/data_types/data_type_string.cpp | 139 +++++++++++------------- 7 files changed, 204 insertions(+), 136 deletions(-) diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 2758bb9b93e..5c07f79d6e4 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -154,6 +154,7 @@ add_thirdparty(com_err) add_thirdparty(k5crypto) add_thirdparty(gssapi_krb5) add_thirdparty(dragonbox_to_chars LIB64) +add_thirdparty(streamvbyte LIB64) target_include_directories(dragonbox_to_chars INTERFACE "${THIRDPARTY_DIR}/include/dragonbox-1.1.3") if (OS_MACOSX) diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp index 0b6f6eaaa3b..355b7efd597 100644 --- a/be/src/vec/core/block.cpp +++ b/be/src/vec/core/block.cpp @@ -866,33 +866,34 @@ Status Block::serialize(int be_exec_version, PBlock* pblock, *uncompressed_bytes = content_uncompressed_size; // compress - if (config::compress_rowbatches && content_uncompressed_size > 0) { - SCOPED_RAW_TIMER(&_compress_time_ns); - pblock->set_compression_type(compression_type); - pblock->set_uncompressed_size(content_uncompressed_size); - - BlockCompressionCodec* codec; - RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); - - faststring buf_compressed; - RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress( - Slice(column_values.data(), content_uncompressed_size), &buf_compressed)); - size_t compressed_size = buf_compressed.size(); - if (LIKELY(compressed_size < content_uncompressed_size)) { - pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); - pblock->set_compressed(true); - *compressed_bytes = compressed_size; - } else { - pblock->set_column_values(std::move(column_values)); - *compressed_bytes = content_uncompressed_size; - } - - VLOG_ROW << "uncompressed size: " << content_uncompressed_size - << ", compressed size: " << compressed_size; - } else { - pblock->set_column_values(std::move(column_values)); - *compressed_bytes = content_uncompressed_size; - } + // if (compression_type != segment_v2::NO_COMPRESSION && content_uncompressed_size > 0) { + // SCOPED_RAW_TIMER(&_compress_time_ns); + // pblock->set_compression_type(compression_type); + // pblock->set_uncompressed_size(content_uncompressed_size); + // + // BlockCompressionCodec* codec; + // RETURN_IF_ERROR(get_block_compression_codec(compression_type, &codec)); + // + // faststring buf_compressed; + // RETURN_IF_ERROR_OR_CATCH_EXCEPTION(codec->compress( + // Slice(column_values.data(), content_uncompressed_size), &buf_compressed)); + // size_t compressed_size = buf_compressed.size(); + // if (LIKELY(compressed_size < content_uncompressed_size)) { + // pblock->set_column_values(buf_compressed.data(), buf_compressed.size()); + // pblock->set_compressed(true); + // *compressed_bytes = compressed_size; + // } else { + // pblock->set_column_values(std::move(column_values)); + // *compressed_bytes = content_uncompressed_size; + // } + // + // VLOG_ROW << "uncompressed size: " << content_uncompressed_size + // << ", compressed size: " << compressed_size; + // } else { + *compressed_bytes = buf - column_values.data(); + column_values.resize(*compressed_bytes); + pblock->set_column_values(std::move(column_values)); + // } if (!allow_transfer_large_data && *compressed_bytes >= std::numeric_limits<int32_t>::max()) { return Status::InternalError("The block is large than 2GB({}), can not send by Protobuf.", *compressed_bytes); diff --git a/be/src/vec/data_types/data_type.h b/be/src/vec/data_types/data_type.h index 000efbd35b7..10bcbf8d775 100644 --- a/be/src/vec/data_types/data_type.h +++ b/be/src/vec/data_types/data_type.h @@ -55,6 +55,7 @@ class Field; using DataTypePtr = std::shared_ptr<const IDataType>; using DataTypes = std::vector<DataTypePtr>; +constexpr auto ENCODE_SIZE_LIMIT = 256; /** Properties of data type. * Contains methods for serialization/deserialization. diff --git a/be/src/vec/data_types/data_type_decimal.cpp b/be/src/vec/data_types/data_type_decimal.cpp index 38c3468c5df..954b9332d4f 100644 --- a/be/src/vec/data_types/data_type_decimal.cpp +++ b/be/src/vec/data_types/data_type_decimal.cpp @@ -22,6 +22,7 @@ #include <fmt/format.h> #include <gen_cpp/data.pb.h> +#include <streamvbyte.h> #include <string.h> #include <utility> @@ -101,39 +102,62 @@ Status DataTypeDecimal<T>::from_string(ReadBuffer& rb, IColumn* column) const { DataTypeDecimalSerDe<T>::get_primitive_type()); } +inline size_t upper_int32(size_t size) { + return (3 + size) / 4.0; +} + // binary: row_num | value1 | value2 | ... template <typename T> int64_t DataTypeDecimal<T>::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - return sizeof(uint32_t) + column.size() * sizeof(FieldType); + auto size = sizeof(T) * column.size(); + if (size <= ENCODE_SIZE_LIMIT) { + return sizeof(uint32_t) + size; + } else { + return sizeof(uint32_t) + sizeof(size_t) + + std::max(size, streamvbyte_max_compressedbytes(upper_int32(size))); + } } template <typename T> char* DataTypeDecimal<T>::serialize(const IColumn& column, char* buf, int be_exec_version) const { // row num - const auto row_num = column.size(); - *reinterpret_cast<uint32_t*>(buf) = row_num; + const auto mem_size = column.size() * sizeof(T); + *reinterpret_cast<uint32_t*>(buf) = mem_size; buf += sizeof(uint32_t); - // column values + // column data auto ptr = column.convert_to_full_column_if_const(); - const auto* origin_data = assert_cast<const ColumnType&>(*ptr.get()).get_data().data(); - memcpy(buf, origin_data, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); - return buf; + const auto* origin_data = assert_cast<const ColumnDecimal<T>&>(*ptr.get()).get_data().data(); + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(buf, origin_data, mem_size); + return buf + mem_size; + } + + auto encode_size = streamvbyte_encode(reinterpret_cast<const uint32_t*>(origin_data), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast<size_t*>(buf) = encode_size; + buf += sizeof(size_t); + return buf + encode_size; } template <typename T> const char* DataTypeDecimal<T>::deserialize(const char* buf, IColumn* column, int be_exec_version) const { // row num - uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf); + uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf); buf += sizeof(uint32_t); - // column values - auto& container = assert_cast<ColumnType*>(column)->get_data(); - container.resize(row_num); - memcpy(container.data(), buf, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); - return buf; + // column data + auto& container = assert_cast<ColumnDecimal<T>*>(column)->get_data(); + container.resize(mem_size / sizeof(T)); + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(container.data(), buf, mem_size); + return buf + mem_size; + } + + size_t encode_size = *reinterpret_cast<const size_t*>(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(container.data()), upper_int32(mem_size)); + return buf + encode_size; } template <typename T> diff --git a/be/src/vec/data_types/data_type_nullable.cpp b/be/src/vec/data_types/data_type_nullable.cpp index f5bcc1df2c3..908015317b6 100644 --- a/be/src/vec/data_types/data_type_nullable.cpp +++ b/be/src/vec/data_types/data_type_nullable.cpp @@ -23,6 +23,7 @@ #include <fmt/format.h> #include <gen_cpp/data.pb.h> #include <glog/logging.h> +#include <streamvbyte.h> #include <string.h> #include <utility> @@ -93,13 +94,22 @@ Status DataTypeNullable::from_string(ReadBuffer& rb, IColumn* column) const { return Status::OK(); } +inline size_t upper_int32(size_t size) { + return (3 + size) / 4.0; +} + // binary: row num | <null array> | <values array> // <null array>: is_null1 | is_null2 | ... // <values array>: value1 | value2 | ...> int64_t DataTypeNullable::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - int64_t size = sizeof(uint32_t); - size += sizeof(bool) * column.size(); + auto size = sizeof(bool) * column.size(); + if (size <= ENCODE_SIZE_LIMIT) { + size += sizeof(uint32_t); + } else { + size += (sizeof(uint32_t) + sizeof(size_t) + + std::max(size, streamvbyte_max_compressedbytes(upper_int32(size)))); + } size += nested_data_type->get_uncompressed_serialized_bytes( assert_cast<const ColumnNullable&>(*column.convert_to_full_column_if_const()) .get_nested_column(), @@ -112,11 +122,20 @@ char* DataTypeNullable::serialize(const IColumn& column, char* buf, int be_exec_ const ColumnNullable& col = assert_cast<const ColumnNullable&>(*ptr.get()); // row num - *reinterpret_cast<uint32_t*>(buf) = column.size(); + auto mem_size = col.size() * sizeof(bool); + *reinterpret_cast<uint32_t*>(buf) = mem_size; buf += sizeof(uint32_t); // null flags - memcpy(buf, col.get_null_map_data().data(), column.size() * sizeof(bool)); - buf += column.size() * sizeof(bool); + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(buf, col.get_null_map_data().data(), mem_size); + buf += mem_size; + } else { + auto encode_size = streamvbyte_encode( + reinterpret_cast<const uint32_t*>(col.get_null_map_data().data()), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast<size_t*>(buf) = encode_size; + buf += (sizeof(size_t) + encode_size); + } // data values return nested_data_type->serialize(col.get_nested_column(), buf, be_exec_version); } @@ -125,12 +144,20 @@ const char* DataTypeNullable::deserialize(const char* buf, IColumn* column, int be_exec_version) const { ColumnNullable* col = assert_cast<ColumnNullable*>(column); // row num - uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf); + uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf); buf += sizeof(uint32_t); // null flags - col->get_null_map_data().resize(row_num); - memcpy(col->get_null_map_data().data(), buf, row_num * sizeof(bool)); - buf += row_num * sizeof(bool); + col->get_null_map_data().resize(mem_size / sizeof(bool)); + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(col->get_null_map_data().data(), buf, mem_size); + buf += mem_size; + } else { + size_t encode_size = *reinterpret_cast<const size_t*>(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(col->get_null_map_data().data()), + upper_int32(mem_size)); + buf += encode_size; + } // data values IColumn& nested = col->get_nested_column(); return nested_data_type->deserialize(buf, &nested, be_exec_version); diff --git a/be/src/vec/data_types/data_type_number_base.cpp b/be/src/vec/data_types/data_type_number_base.cpp index bd73fc4adc8..372ad23ce32 100644 --- a/be/src/vec/data_types/data_type_number_base.cpp +++ b/be/src/vec/data_types/data_type_number_base.cpp @@ -22,11 +22,11 @@ #include <fmt/format.h> #include <glog/logging.h> +#include <streamvbyte.h> #include <string.h> #include <limits> #include <type_traits> -#include <utility> #include "gutil/strings/numbers.h" #include "runtime/large_int_value.h" @@ -158,42 +158,63 @@ std::string DataTypeNumberBase<T>::to_string(const IColumn& column, size_t row_n } } +inline size_t upper_int32(size_t size) { + return (3 + size) / 4.0; +} + // binary: row num | value1 | value2 | ... template <typename T> int64_t DataTypeNumberBase<T>::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { - return sizeof(uint32_t) + column.size() * sizeof(FieldType); + auto size = sizeof(T) * column.size(); + if (size <= ENCODE_SIZE_LIMIT) { + return sizeof(uint32_t) + size; + } else { + return sizeof(uint32_t) + sizeof(size_t) + + std::max(size, streamvbyte_max_compressedbytes(upper_int32(size))); + } } template <typename T> char* DataTypeNumberBase<T>::serialize(const IColumn& column, char* buf, int be_exec_version) const { // row num - const auto row_num = column.size(); - *reinterpret_cast<uint32_t*>(buf) = row_num; + const auto mem_size = column.size() * sizeof(T); + *reinterpret_cast<uint32_t*>(buf) = mem_size; buf += sizeof(uint32_t); // column data auto ptr = column.convert_to_full_column_if_const(); const auto* origin_data = assert_cast<const ColumnVector<T>&>(*ptr.get()).get_data().data(); - memcpy(buf, origin_data, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(buf, origin_data, mem_size); + return buf + mem_size; + } - return buf; + auto encode_size = streamvbyte_encode(reinterpret_cast<const uint32_t*>(origin_data), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast<size_t*>(buf) = encode_size; + buf += sizeof(size_t); + return buf + encode_size; } template <typename T> const char* DataTypeNumberBase<T>::deserialize(const char* buf, IColumn* column, int be_exec_version) const { // row num - uint32_t row_num = *reinterpret_cast<const uint32_t*>(buf); + uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf); buf += sizeof(uint32_t); // column data auto& container = assert_cast<ColumnVector<T>*>(column)->get_data(); - container.resize(row_num); - memcpy(container.data(), buf, row_num * sizeof(FieldType)); - buf += row_num * sizeof(FieldType); + container.resize(mem_size / sizeof(T)); + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(container.data(), buf, mem_size); + return buf + mem_size; + } - return buf; + size_t encode_size = *reinterpret_cast<const size_t*>(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(container.data()), upper_int32(mem_size)); + return buf + encode_size; } template <typename T> diff --git a/be/src/vec/data_types/data_type_string.cpp b/be/src/vec/data_types/data_type_string.cpp index bf23705bb7a..35a4de64683 100644 --- a/be/src/vec/data_types/data_type_string.cpp +++ b/be/src/vec/data_types/data_type_string.cpp @@ -20,11 +20,10 @@ #include "vec/data_types/data_type_string.h" +#include <lz4/lz4.h> +#include <streamvbyte.h> #include <string.h> -#include <typeinfo> -#include <utility> - #include "vec/columns/column.h" #include "vec/columns/column_const.h" #include "vec/columns/column_string.h" @@ -73,6 +72,9 @@ bool DataTypeString::equals(const IDataType& rhs) const { return typeid(rhs) == typeid(*this); } +inline size_t upper_int32(size_t size) { + return (3 + size) / 4.0; +} // binary: <size array> | total length | <value array> // <size array> : row num | offset1 |offset2 | ... // <value array> : <value1> | <value2 | ... @@ -80,57 +82,58 @@ int64_t DataTypeString::get_uncompressed_serialized_bytes(const IColumn& column, int be_exec_version) const { auto ptr = column.convert_to_full_column_if_const(); const auto& data_column = assert_cast<const ColumnString&>(*ptr.get()); - - if (be_exec_version == 0) { - return sizeof(IColumn::Offset) * (column.size() + 1) + sizeof(uint64_t) + - data_column.get_chars().size() + column.size(); + int64_t size = sizeof(uint32_t) + sizeof(uint64_t); + if (auto offsets_size = data_column.size() * sizeof(IColumn::Offset); + offsets_size <= ENCODE_SIZE_LIMIT) { + size += offsets_size; + } else { + size += sizeof(size_t) + + std::max(offsets_size, streamvbyte_max_compressedbytes(upper_int32(offsets_size))); } - return sizeof(IColumn::Offset) * (column.size() + 1) + sizeof(uint64_t) + - data_column.get_chars().size(); + if (auto bytes = data_column.get_chars().size(); bytes <= ENCODE_SIZE_LIMIT) { + size += bytes; + } else { + size += sizeof(size_t) + + std::max(bytes, streamvbyte_max_compressedbytes(upper_int32(bytes))); + } + return size; } char* DataTypeString::serialize(const IColumn& column, char* buf, int be_exec_version) const { auto ptr = column.convert_to_full_column_if_const(); const auto& data_column = assert_cast<const ColumnString&>(*ptr.get()); - if (be_exec_version == 0) { - // row num - *reinterpret_cast<IColumn::Offset*>(buf) = column.size(); - buf += sizeof(IColumn::Offset); - // offsets - for (int i = 0; i < column.size(); i++) { - *reinterpret_cast<IColumn::Offset*>(buf) = data_column.get_offsets()[i] + i + 1; - buf += sizeof(IColumn::Offset); - } - // total length - *reinterpret_cast<uint64_t*>(buf) = data_column.get_chars().size() + column.size(); - buf += sizeof(uint64_t); - // values - for (int i = 0; i < column.size(); i++) { - auto data = data_column.get_data_at(i); - memcpy(buf, data.data, data.size); - buf += data.size; - *buf = '\0'; - buf++; - } - return buf; - } - // row num - *reinterpret_cast<IColumn::Offset*>(buf) = column.size(); - buf += sizeof(IColumn::Offset); + uint32_t mem_size = data_column.size() * sizeof(IColumn::Offset); + *reinterpret_cast<uint32_t*>(buf) = mem_size; + buf += sizeof(uint32_t); // offsets - memcpy(buf, data_column.get_offsets().data(), column.size() * sizeof(IColumn::Offset)); - buf += column.size() * sizeof(IColumn::Offset); - // total length + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(buf, data_column.get_offsets().data(), mem_size); + buf += mem_size; + } else { + auto encode_size = streamvbyte_encode( + reinterpret_cast<const uint32_t*>(data_column.get_offsets().data()), + upper_int32(mem_size), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast<size_t*>(buf) = encode_size; + buf += (sizeof(size_t) + encode_size); + } + + // values uint64_t value_len = data_column.get_chars().size(); *reinterpret_cast<uint64_t*>(buf) = value_len; buf += sizeof(uint64_t); - // values - memcpy(buf, data_column.get_chars().data(), value_len); - buf += value_len; - + if (value_len <= ENCODE_SIZE_LIMIT) { + memcpy(buf, data_column.get_chars().data(), value_len); + buf += value_len; + return buf; + } + auto encode_size = + streamvbyte_encode(reinterpret_cast<const uint32_t*>(data_column.get_chars().data()), + upper_int32(value_len), (uint8_t*)(buf + sizeof(size_t))); + *reinterpret_cast<size_t*>(buf) = encode_size; + buf += (sizeof(size_t) + encode_size); return buf; } @@ -140,44 +143,34 @@ const char* DataTypeString::deserialize(const char* buf, IColumn* column, ColumnString::Chars& data = column_string->get_chars(); ColumnString::Offsets& offsets = column_string->get_offsets(); - if (be_exec_version == 0) { - // row num - IColumn::Offset row_num = *reinterpret_cast<const IColumn::Offset*>(buf); - buf += sizeof(IColumn::Offset); - // offsets - offsets.resize(row_num); - for (int i = 0; i < row_num; i++) { - offsets[i] = *reinterpret_cast<const IColumn::Offset*>(buf) - i - 1; - buf += sizeof(IColumn::Offset); - } - // total length - uint64_t value_len = *reinterpret_cast<const uint64_t*>(buf); - buf += sizeof(uint64_t); - // values - data.resize(value_len - row_num); - for (int i = 0; i < row_num; i++) { - memcpy(data.data() + offsets[i - 1], buf, offsets[i] - offsets[i - 1]); - buf += offsets[i] - offsets[i - 1] + 1; - } - - return buf; - } - - // row num - IColumn::Offset row_num = *reinterpret_cast<const IColumn::Offset*>(buf); - buf += sizeof(IColumn::Offset); + uint32_t mem_size = *reinterpret_cast<const uint32_t*>(buf); + buf += sizeof(uint32_t); + offsets.resize(mem_size / sizeof(IColumn::Offset)); // offsets - offsets.resize(row_num); - memcpy(offsets.data(), buf, sizeof(IColumn::Offset) * row_num); - buf += sizeof(IColumn::Offset) * row_num; + if (mem_size <= ENCODE_SIZE_LIMIT) { + memcpy(offsets.data(), buf, mem_size); + buf += mem_size; + } else { + size_t encode_size = *reinterpret_cast<const size_t*>(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(offsets.data()), upper_int32(mem_size)); + buf += encode_size; + } // total length uint64_t value_len = *reinterpret_cast<const uint64_t*>(buf); buf += sizeof(uint64_t); - // values data.resize(value_len); - memcpy(data.data(), buf, value_len); - buf += value_len; + // offsets + if (value_len <= ENCODE_SIZE_LIMIT) { + memcpy(data.data(), buf, value_len); + buf += value_len; + } else { + size_t encode_size = *reinterpret_cast<const size_t*>(buf); + buf += sizeof(size_t); + streamvbyte_decode((const uint8_t*)buf, (uint32_t*)(data.data()), upper_int32(value_len)); + buf += encode_size; + } return buf; } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org