This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 873d39e96e9 branch-3.1:[enhancement](parquet)Optimize the performance
of parquet reader when decode RLE_DICTIONARY encoding (#57208) (#57614)
873d39e96e9 is described below
commit 873d39e96e9b88e1ea7e4f978aea5ed325ebfb79
Author: daidai <[email protected]>
AuthorDate: Tue Nov 4 10:09:06 2025 +0800
branch-3.1:[enhancement](parquet)Optimize the performance of parquet reader
when decode RLE_DICTIONARY encoding (#57208) (#57614)
bp #57208
---
be/src/vec/exec/format/parquet/decoder.cpp | 12 +++-
.../format/parquet/fix_length_dict_decoder.hpp | 64 +++++++++++++++++++---
.../parquet/fix_length_dict_decoder_test.cpp | 4 +-
3 files changed, 70 insertions(+), 10 deletions(-)
diff --git a/be/src/vec/exec/format/parquet/decoder.cpp
b/be/src/vec/exec/format/parquet/decoder.cpp
index 335f3029da4..6a1a319d8c6 100644
--- a/be/src/vec/exec/format/parquet/decoder.cpp
+++ b/be/src/vec/exec/format/parquet/decoder.cpp
@@ -63,12 +63,22 @@ Status Decoder::get_decoder(tparquet::Type::type type,
tparquet::Encoding::type
decoder.reset(new ByteArrayDictDecoder());
break;
case tparquet::Type::INT32:
+ decoder.reset(new FixLengthDictDecoder<tparquet::Type::INT32>());
+ break;
case tparquet::Type::INT64:
+ decoder.reset(new FixLengthDictDecoder<tparquet::Type::INT64>());
+ break;
case tparquet::Type::INT96:
+ decoder.reset(new FixLengthDictDecoder<tparquet::Type::INT96>());
+ break;
case tparquet::Type::FLOAT:
+ decoder.reset(new FixLengthDictDecoder<tparquet::Type::FLOAT>());
+ break;
case tparquet::Type::DOUBLE:
+ decoder.reset(new FixLengthDictDecoder<tparquet::Type::DOUBLE>());
+ break;
case tparquet::Type::FIXED_LEN_BYTE_ARRAY:
- decoder.reset(new FixLengthDictDecoder());
+ decoder.reset(new
FixLengthDictDecoder<tparquet::Type::FIXED_LEN_BYTE_ARRAY>());
break;
default:
return Status::InternalError("Unsupported type {}(encoding={}) in
parquet decoder",
diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
index fba7e0826ae..ef4f038e64f 100644
--- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
@@ -25,8 +25,43 @@
namespace doris::vectorized {
+template <tparquet::Type::type type>
+struct PhysicalTypeTraits {};
+
+template <>
+struct PhysicalTypeTraits<tparquet::Type::INT32> {
+ using CppType = int32_t;
+};
+
+template <>
+struct PhysicalTypeTraits<tparquet::Type::INT64> {
+ using CppType = int64_t;
+};
+
+template <>
+struct PhysicalTypeTraits<tparquet::Type::INT96> {
+ using CppType = ParquetInt96;
+};
+
+template <>
+struct PhysicalTypeTraits<tparquet::Type::FLOAT> {
+ using CppType = float;
+};
+
+template <>
+struct PhysicalTypeTraits<tparquet::Type::DOUBLE> {
+ using CppType = double;
+};
+
+template <>
+struct PhysicalTypeTraits<tparquet::Type::FIXED_LEN_BYTE_ARRAY> {
+ using CppType = Slice;
+};
+
+template <tparquet::Type::type PhysicalType>
class FixLengthDictDecoder final : public BaseDictDecoder {
public:
+ using cppType = PhysicalTypeTraits<PhysicalType>::CppType;
FixLengthDictDecoder() = default;
~FixLengthDictDecoder() override = default;
@@ -46,9 +81,12 @@ public:
if (doris_column->is_column_dictionary() &&
assert_cast<ColumnDictI32&>(*doris_column).dict_size() == 0) {
std::vector<StringRef> dict_items;
+
+ char* dict_item_address = (char*)_dict.get();
dict_items.reserve(_dict_items.size());
for (int i = 0; i < _dict_items.size(); ++i) {
- dict_items.emplace_back(_dict_items[i], _type_length);
+ dict_items.emplace_back(dict_item_address, _type_length);
+ dict_item_address += _type_length;
}
assert_cast<ColumnDictI32&>(*doris_column)
.insert_many_dict_data(dict_items.data(),
dict_items.size());
@@ -79,8 +117,12 @@ protected:
switch (read_type) {
case ColumnSelectVector::CONTENT: {
for (size_t i = 0; i < run_length; ++i) {
- memcpy(raw_data + data_index,
_dict_items[_indexes[dict_index++]],
- _type_length);
+ if constexpr (PhysicalType ==
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ auto& slice = _dict_items[_indexes[dict_index++]];
+ memcpy(raw_data + data_index, slice.get_data(),
_type_length);
+ } else {
+ *(cppType*)(raw_data + data_index) =
_dict_items[_indexes[dict_index++]];
+ }
data_index += _type_length;
}
break;
@@ -113,7 +155,11 @@ protected:
char* dict_item_address = reinterpret_cast<char*>(_dict.get());
_dict_items.resize(num_values);
for (size_t i = 0; i < num_values; ++i) {
- _dict_items[i] = dict_item_address;
+ if constexpr (PhysicalType ==
tparquet::Type::FIXED_LEN_BYTE_ARRAY) {
+ _dict_items[i] = Slice {dict_item_address,
(size_t)_type_length};
+ } else {
+ _dict_items[i] = *((cppType*)dict_item_address);
+ }
dict_item_address += _type_length;
}
return Status::OK();
@@ -123,8 +169,10 @@ protected:
size_t dict_items_size = _dict_items.size();
std::vector<StringRef> dict_values;
dict_values.reserve(dict_items_size);
+ auto* dict_item_address = (const char*)_dict.get();
for (size_t i = 0; i < dict_items_size; ++i) {
- dict_values.emplace_back(_dict_items[i], _type_length);
+ dict_values.emplace_back(dict_item_address, _type_length);
+ dict_item_address += _type_length;
}
doris_column->insert_many_strings(&dict_values[0], dict_items_size);
return Status::OK();
@@ -135,14 +183,16 @@ protected:
std::vector<StringRef> dict_values;
dict_values.reserve(dict_column->size());
const auto& data = dict_column->get_data();
+ auto* dict_item_address = (const char*)_dict.get();
+
for (size_t i = 0; i < dict_column->size(); ++i) {
- dict_values.emplace_back(_dict_items[data[i]], _type_length);
+ dict_values.emplace_back(dict_item_address + data[i] *
_type_length, _type_length);
}
res->insert_many_strings(&dict_values[0], dict_values.size());
return res;
}
// For dictionary encoding
- std::vector<char*> _dict_items;
+ std::vector<typename PhysicalTypeTraits<PhysicalType>::CppType>
_dict_items;
};
} // namespace doris::vectorized
diff --git a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
index a19f2b071a3..65b5e9ed033 100644
--- a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
+++ b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
@@ -43,7 +43,7 @@ protected:
ASSERT_TRUE(_decoder.set_dict(dict_data, dict_data_size,
dict_size).ok());
}
- FixLengthDictDecoder _decoder;
+ FixLengthDictDecoder<tparquet::Type::FIXED_LEN_BYTE_ARRAY> _decoder;
size_t _type_length;
};
@@ -199,7 +199,7 @@ TEST_F(FixLengthDictDecoderTest,
test_decode_with_filter_and_null) {
// Test empty dictionary case
TEST_F(FixLengthDictDecoderTest, test_empty_dict) {
- FixLengthDictDecoder empty_decoder;
+ FixLengthDictDecoder<tparquet::Type::INT32> empty_decoder;
empty_decoder.set_type_length(sizeof(int32_t));
auto dict_data = std::make_unique<uint8_t[]>(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]