This is an automated email from the ASF dual-hosted git repository. mrhhsg pushed a commit to tag selectdb-doris-2.1.10-jxrt-20250908 in repository https://gitbox.apache.org/repos/asf/doris.git
commit 0d551d63332599c8e4fcac94a9698797cd53ae4c Author: Hu Shenggang <[email protected]> AuthorDate: Sun Sep 7 16:34:23 2025 +0800 [opt](be) Use a custom allocator to allocate memory in order to avoid untracked memory usage --- be/src/io/fs/broker_file_system.cpp | 6 +- be/src/io/fs/buffered_reader.cpp | 4 +- be/src/io/fs/buffered_reader.h | 3 +- be/src/io/fs/stream_load_pipe.cpp | 9 +-- be/src/io/fs/stream_load_pipe.h | 16 +++-- be/src/olap/tablet.cpp | 3 +- be/src/util/faststring.cc | 10 ++-- be/src/util/slice.h | 2 +- be/src/vec/common/allocator.cpp | 2 +- be/src/vec/common/allocator.h | 2 +- be/src/vec/common/custom_allocator.h | 70 ++++++++++++++++++++++ .../file_reader/new_plain_binary_line_reader.h | 3 +- be/src/vec/exec/format/json/new_json_reader.cpp | 14 +++-- be/src/vec/exec/format/json/new_json_reader.h | 7 ++- be/src/vec/exec/format/orc/orc_file_reader.cpp | 3 +- be/src/vec/exec/format/orc/orc_file_reader.h | 3 +- .../format/parquet/byte_array_dict_decoder.cpp | 2 +- .../exec/format/parquet/byte_array_dict_decoder.h | 3 +- be/src/vec/exec/format/parquet/decoder.h | 6 +- .../format/parquet/fix_length_dict_decoder.hpp | 3 +- .../vec/exec/format/parquet/parquet_thrift_util.h | 5 +- .../parquet/vparquet_column_chunk_reader.cpp | 6 +- .../format/parquet/vparquet_column_chunk_reader.h | 2 +- .../exec/format/parquet/vparquet_group_reader.cpp | 9 +-- .../exec/format/parquet/vparquet_group_reader.h | 3 +- be/src/vec/functions/function_jsonb.cpp | 4 +- .../parquet/byte_array_dict_decoder_test.cpp | 5 +- .../parquet/fix_length_dict_decoder_test.cpp | 5 +- 28 files changed, 150 insertions(+), 60 deletions(-) diff --git a/be/src/io/fs/broker_file_system.cpp b/be/src/io/fs/broker_file_system.cpp index 0d6d0d3555a..8b0d5db23e2 100644 --- a/be/src/io/fs/broker_file_system.cpp +++ b/be/src/io/fs/broker_file_system.cpp @@ -22,11 +22,10 @@ #include <gen_cpp/TPaloBrokerService.h> #include <gen_cpp/Types_types.h> #include <glog/logging.h> -#include <stddef.h> #include <thrift/Thrift.h> #include <thrift/transport/TTransportException.h> -#include <algorithm> +#include <cstddef> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <filesystem> @@ -45,6 +44,7 @@ #include "runtime/broker_mgr.h" #include "runtime/exec_env.h" #include "util/slice.h" +#include "vec/common/custom_allocator.h" namespace doris::io { @@ -405,7 +405,7 @@ Status BrokerFileSystem::download_impl(const Path& remote_file, const Path& loca // 4. read remote and write to local VLOG(2) << "read remote file: " << remote_file << " to local: " << local_file; constexpr size_t buf_sz = 1024 * 1024; - std::unique_ptr<uint8_t[]> read_buf(new uint8_t[buf_sz]); + auto read_buf = make_unique_buffer<uint8_t>(buf_sz); size_t cur_offset = 0; while (true) { size_t read_len = 0; diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp index 488e235af12..a22b08c0659 100644 --- a/be/src/io/fs/buffered_reader.cpp +++ b/be/src/io/fs/buffered_reader.cpp @@ -23,6 +23,7 @@ #include <algorithm> #include <chrono> +#include <cstdint> #include <memory> #include "common/cast_set.h" @@ -35,6 +36,7 @@ #include "util/runtime_profile.h" #include "util/slice.h" #include "util/threadpool.h" +#include "vec/common/custom_allocator.h" namespace doris { #include "common/compile_check_begin.h" @@ -800,7 +802,7 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset } size_t buf_size = std::max(_max_buf_size, bytes_to_read); if (_buf_size < buf_size) { - std::unique_ptr<uint8_t[]> new_buf(new uint8_t[buf_size]); + auto new_buf = make_unique_buffer<uint8_t>(buf_size); if (offset >= _buf_start_offset && offset < _buf_end_offset) { memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset); diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h index f298a3fe166..394c7eb9ad4 100644 --- a/be/src/io/fs/buffered_reader.h +++ b/be/src/io/fs/buffered_reader.h @@ -37,6 +37,7 @@ #include "olap/olap_define.h" #include "util/runtime_profile.h" #include "util/slice.h" +#include "vec/common/custom_allocator.h" #include "vec/common/typeid_cast.h" namespace doris { @@ -655,7 +656,7 @@ protected: } private: - std::unique_ptr<uint8_t[]> _buf; + DorisUniqueBufferPtr<uint8_t> _buf; io::FileReaderSPtr _file; uint64_t _file_start_offset; uint64_t _file_end_offset; diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp index 4b204a2aace..d25eb6a839c 100644 --- a/be/src/io/fs/stream_load_pipe.cpp +++ b/be/src/io/fs/stream_load_pipe.cpp @@ -28,6 +28,7 @@ #include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/bit_util.h" +#include "vec/common/custom_allocator.h" namespace doris { namespace io { @@ -90,7 +91,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* byt // If _total_length == -1, this should be a Kafka routine load task or stream load with chunked transfer HTTP request, // just get the next buffer directly from the buffer queue, because one buffer contains a complete piece of data. // Otherwise, this should be a stream load task that needs to read the specified amount of data. -Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length) { +Status StreamLoadPipe::read_one_message(DorisUniqueBufferPtr<uint8_t>* data, size_t* length) { if (_total_length < -1) { return Status::InternalError("invalid, _total_length is: {}", _total_length); } else if (_total_length == 0) { @@ -104,7 +105,7 @@ Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t } // _total_length > 0, read the entire data - data->reset(new uint8_t[_total_length]); + *data = make_unique_buffer<uint8_t>(_total_length); Slice result(data->get(), _total_length); Status st = read_at(0, result, length); return st; @@ -163,7 +164,7 @@ Status StreamLoadPipe::append(const ByteBufferPtr& buf) { } // read the next buffer from _buf_queue -Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length) { +Status StreamLoadPipe::_read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, size_t* length) { std::unique_lock<std::mutex> l(_lock); while (!_cancelled && !_finished && _buf_queue.empty()) { _get_cond.wait(l); @@ -181,7 +182,7 @@ Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_ } auto buf = _buf_queue.front(); *length = buf->remaining(); - data->reset(new uint8_t[*length]); + *data = make_unique_buffer<uint8_t>(*length); buf->get_bytes((char*)(data->get()), *length); _buf_queue.pop_front(); _buffered_bytes -= buf->limit; diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h index 4731c620142..cedab0b6c17 100644 --- a/be/src/io/fs/stream_load_pipe.h +++ b/be/src/io/fs/stream_load_pipe.h @@ -18,10 +18,10 @@ #pragma once #include <gen_cpp/internal_service.pb.h> -#include <stddef.h> -#include <stdint.h> #include <condition_variable> +#include <cstddef> +#include <cstdint> #include <deque> #include <memory> #include <mutex> @@ -29,14 +29,13 @@ #include "common/status.h" #include "io/fs/file_reader.h" -#include "io/fs/file_system.h" #include "io/fs/path.h" #include "runtime/message_body_sink.h" #include "util/byte_buffer.h" #include "util/slice.h" +#include "vec/common/custom_allocator.h" -namespace doris { -namespace io { +namespace doris::io { struct IOContext; static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024; @@ -74,7 +73,7 @@ public: // called when producer/consumer failed virtual void cancel(const std::string& reason) override; - Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length); + Status read_one_message(DorisUniqueBufferPtr<uint8_t>* data, size_t* length); size_t get_queue_size() { return _buf_queue.size(); } @@ -97,7 +96,7 @@ protected: private: // read the next buffer from _buf_queue - Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length); + Status _read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, size_t* length); Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0); @@ -130,5 +129,4 @@ private: // the data needs to be completely read before it can be parsed. bool _is_chunked_transfer = false; }; -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index 758e1168274..6ef58fa9e99 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -121,6 +121,7 @@ #include "util/work_thread_pool.hpp" #include "vec/columns/column.h" #include "vec/columns/column_string.h" +#include "vec/common/custom_allocator.h" #include "vec/common/schema_util.h" #include "vec/common/string_ref.h" #include "vec/data_types/data_type.h" @@ -2184,7 +2185,7 @@ Status Tablet::_read_cooldown_meta(const StorageResource& storage_resource, RETURN_IF_ERROR(storage_resource.fs->open_file(remote_meta_path, &tablet_meta_reader)); auto file_size = tablet_meta_reader->size(); size_t bytes_read; - auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[file_size]); + auto buf = make_unique_buffer<uint8_t>(file_size); RETURN_IF_ERROR(tablet_meta_reader->read_at(0, {buf.get(), file_size}, &bytes_read)); RETURN_IF_ERROR(tablet_meta_reader->close()); if (!tablet_meta_pb->ParseFromArray(buf.get(), cast_set<int>(file_size))) { diff --git a/be/src/util/faststring.cc b/be/src/util/faststring.cc index 046e08c29d8..47b07cdeb70 100644 --- a/be/src/util/faststring.cc +++ b/be/src/util/faststring.cc @@ -19,7 +19,7 @@ #include <glog/logging.h> -#include <memory> +#include "vec/common/custom_allocator.h" namespace doris { @@ -37,7 +37,7 @@ void faststring::GrowToAtLeast(size_t newcapacity) { void faststring::GrowArray(size_t newcapacity) { DCHECK_GE(newcapacity, capacity_); - std::unique_ptr<uint8_t[]> newdata(reinterpret_cast<uint8_t*>(Allocator::alloc(newcapacity))); + auto* newdata = reinterpret_cast<uint8_t*>(Allocator::alloc(newcapacity)); if (len_ > 0) { memcpy(&newdata[0], &data_[0], len_); } @@ -48,7 +48,7 @@ void faststring::GrowArray(size_t newcapacity) { ASAN_POISON_MEMORY_REGION(initial_data_, arraysize(initial_data_)); } - data_ = newdata.release(); + data_ = newdata; capacity_ = newcapacity; ASAN_POISON_MEMORY_REGION(data_ + len_, capacity_ - len_); } @@ -62,10 +62,10 @@ void faststring::ShrinkToFitInternal() { data_ = initial_data_; capacity_ = kInitialCapacity; } else { - std::unique_ptr<uint8_t[]> newdata(reinterpret_cast<uint8_t*>(Allocator::alloc(len_))); + auto* newdata = reinterpret_cast<uint8_t*>(Allocator::alloc(len_)); memcpy(&newdata[0], &data_[0], len_); Allocator::free(data_, capacity_); - data_ = newdata.release(); + data_ = newdata; capacity_ = len_; } } diff --git a/be/src/util/slice.h b/be/src/util/slice.h index 3aa9a062153..c8b1629b8bd 100644 --- a/be/src/util/slice.h +++ b/be/src/util/slice.h @@ -311,7 +311,7 @@ inline int Slice::compare(const Slice& b) const { } // A move-only type which manage the lifecycle of externally allocated data. -// Unlike std::unique_ptr<uint8_t[]>, OwnedSlice remembers the size of data so that clients can access +// Unlike DorisUniqueBufferPtr<uint8_t>, OwnedSlice remembers the size of data so that clients can access // the underlying buffer as a Slice. // // Usage example: diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp index 50527cbe32e..21db641335b 100644 --- a/be/src/vec/common/allocator.cpp +++ b/be/src/vec/common/allocator.cpp @@ -373,7 +373,7 @@ void* Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator, template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename MemoryAllocator, bool check_and_tracking_memory> void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator, - check_and_tracking_memory>::free(void* buf, size_t size) { + check_and_tracking_memory>::free(void* buf, size_t size) const { if (use_mmap && size >= doris::config::mmap_threshold) { if (0 != munmap(buf, size)) { throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size)); diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h index 2a288c65c82..e7ede2adfa9 100644 --- a/be/src/vec/common/allocator.h +++ b/be/src/vec/common/allocator.h @@ -232,7 +232,7 @@ public: void* realloc(void* buf, size_t old_size, size_t new_size, size_t alignment = 0); // Free memory range. - void free(void* buf, size_t size); + void free(void* buf, size_t size) const; void release_unused() { MemoryAllocator::release_unused(); } diff --git a/be/src/vec/common/custom_allocator.h b/be/src/vec/common/custom_allocator.h index 0e2cea3b9e7..22f032fce95 100644 --- a/be/src/vec/common/custom_allocator.h +++ b/be/src/vec/common/custom_allocator.h @@ -17,6 +17,9 @@ #pragma once +#include <cstddef> +#include <type_traits> + #include "vec/common/allocator.h" #include "vec/common/allocator_fwd.h" @@ -33,6 +36,73 @@ template <class Key, class T, class Compare = std::less<Key>, class Allocator = CustomStdAllocator<std::pair<const Key, T>>> using DorisMap = std::map<Key, T, Compare, Allocator>; +template <typename T> + requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>) +struct DorisUniqueBufferDeleter : public Allocator<false> { + size_t count = 0; + + DorisUniqueBufferDeleter() = default; + DorisUniqueBufferDeleter(size_t n) : count(n) {} + + void operator()(T* ptr) const noexcept { + if (ptr) { + Allocator::free(ptr, count * sizeof(T)); + } + } +}; + +template <typename T> + requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>) +class DorisUniqueBufferPtr { +public: + using Deleter = DorisUniqueBufferDeleter<T>; + + DorisUniqueBufferPtr() = default; + explicit DorisUniqueBufferPtr(T* ptr) = delete; + + DorisUniqueBufferPtr(std::unique_ptr<T[], Deleter>&& uptr) : ptr_(std::move(uptr)) {} + DorisUniqueBufferPtr(DorisUniqueBufferPtr&& other) : ptr_(std::move(other.ptr_)) {} + + DorisUniqueBufferPtr& operator=(DorisUniqueBufferPtr&& other) { + if (this != &other) { + ptr_ = std::move(other.ptr_); + } + return *this; + } + + DorisUniqueBufferPtr(std::nullptr_t) noexcept {} + + void reset(T*) = delete; + + bool operator==(std::nullptr_t) const noexcept { return ptr_ == nullptr; } + bool operator==(T* other) const noexcept { return ptr_ == other; } + + void reset() noexcept { ptr_.reset(); } + + auto release() noexcept { return ptr_.release(); } + + T* get() const noexcept { return ptr_.get(); } + T& operator*() const noexcept { return *ptr_; } + T* operator->() const noexcept { return ptr_.get(); } + T& operator[](size_t i) const { return ptr_[i]; } + +private: + std::unique_ptr<T[], Deleter> ptr_; +}; + +template <typename T> + requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>) +DorisUniqueBufferPtr<T> make_unique_buffer(size_t n) { + DorisUniqueBufferDeleter<T> deleter(n); + void* buf = deleter.alloc(n * sizeof(T)); + if (!buf) { + return DorisUniqueBufferPtr<T>(); + } + T* arr = static_cast<T*>(buf); + return DorisUniqueBufferPtr<T>( + std::unique_ptr<T[], DorisUniqueBufferDeleter<T>>(arr, std::move(deleter))); +} + // NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory tracker,but it's still stateless, // because threadcontext owns the memtracker, not CustomStdAllocator. template <class T, typename MemoryAllocator> diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h index e81afb162c3..02a948dc839 100644 --- a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h +++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h @@ -24,6 +24,7 @@ #include "common/status.h" #include "exec/line_reader.h" #include "io/fs/file_reader_writer_fwd.h" +#include "vec/common/custom_allocator.h" namespace doris { #include "common/compile_check_begin.h" @@ -51,7 +52,7 @@ public: private: io::FileReaderSPtr _file_reader; - std::unique_ptr<uint8_t[]> _file_buf; + DorisUniqueBufferPtr<uint8_t> _file_buf; std::unique_ptr<PDataRow> _cur_row; }; diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp b/be/src/vec/exec/format/json/new_json_reader.cpp index e49100f82a1..e75bff2c96f 100644 --- a/be/src/vec/exec/format/json/new_json_reader.cpp +++ b/be/src/vec/exec/format/json/new_json_reader.cpp @@ -58,6 +58,7 @@ #include "vec/columns/column_string.h" #include "vec/columns/column_struct.h" #include "vec/common/assert_cast.h" +#include "vec/common/custom_allocator.h" #include "vec/core/column_with_type_and_name.h" #include "vec/data_types/data_type_array.h" #include "vec/data_types/data_type_factory.hpp" @@ -244,7 +245,7 @@ Status NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names, std::vector<DataTypePtr>* col_types) { bool eof = false; const uint8_t* json_str = nullptr; - std::unique_ptr<uint8_t[]> json_str_ptr; + DorisUniqueBufferPtr<uint8_t> json_str_ptr; size_t size = 0; if (_line_reader != nullptr) { RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof, _io_ctx)); @@ -474,7 +475,8 @@ Status NewJsonReader::_read_json_column(RuntimeState* state, Block& block, return (this->*_vhandle_json_callback)(state, block, slot_descs, is_empty_row, eof); } -Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size) { +Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, + size_t* read_size) { switch (_params.file_type) { case TFileType::FILE_LOCAL: [[fallthrough]]; @@ -482,7 +484,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si [[fallthrough]]; case TFileType::FILE_S3: { size_t file_size = _file_reader->size(); - file_buf->reset(new uint8_t[file_size]); + *file_buf = make_unique_buffer<uint8_t>(file_size); Slice result(file_buf->get(), file_size); RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, read_size, _io_ctx)); _current_offset += *read_size; @@ -499,7 +501,7 @@ Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si return Status::OK(); } -Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, +Status NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size) { auto* stream_load_pipe = dynamic_cast<io::StreamLoadPipe*>(_file_reader.get()); @@ -515,7 +517,7 @@ Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi uint64_t cur_size = 0; // second read: continuously read data from the pipe until all data is read. - std::unique_ptr<uint8_t[]> read_buf; + DorisUniqueBufferPtr<uint8_t> read_buf; size_t read_buf_size = 0; while (true) { RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, &read_buf_size)); @@ -534,7 +536,7 @@ Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi return Status::OK(); } - std::unique_ptr<uint8_t[]> total_buf = std::make_unique<uint8_t[]>(cur_size + *read_size); + DorisUniqueBufferPtr<uint8_t> total_buf = make_unique_buffer<uint8_t>(cur_size + *read_size); // copy the data during the first read memcpy(total_buf.get(), file_buf->get(), *read_size); diff --git a/be/src/vec/exec/format/json/new_json_reader.h b/be/src/vec/exec/format/json/new_json_reader.h index f537f899eb3..283b1c6251e 100644 --- a/be/src/vec/exec/format/json/new_json_reader.h +++ b/be/src/vec/exec/format/json/new_json_reader.h @@ -37,6 +37,7 @@ #include "io/file_factory.h" #include "io/fs/file_reader_writer_fwd.h" #include "util/runtime_profile.h" +#include "vec/common/custom_allocator.h" #include "vec/common/string_ref.h" #include "vec/core/types.h" #include "vec/exec/format/generic_reader.h" @@ -102,11 +103,11 @@ private: const std::vector<SlotDescriptor*>& slot_descs, bool* is_empty_row, bool* eof); - Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size); + Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size); // StreamLoadPipe::read_one_message only reads a portion of the data when stream loading with a chunked transfer HTTP request. // Need to read all the data before performing JSON parsing. - Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, size_t* read_size); + Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* read_size); // simdjson, replace none simdjson function if it is ready Status _simdjson_init_reader(); @@ -239,7 +240,7 @@ private: /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name. std::vector<UInt8> _seen_columns; // simdjson - std::unique_ptr<uint8_t[]> _json_str_ptr; + DorisUniqueBufferPtr<uint8_t> _json_str_ptr; const uint8_t* _json_str = nullptr; static constexpr size_t _init_buffer_size = 1024 * 1024 * 8; size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING; diff --git a/be/src/vec/exec/format/orc/orc_file_reader.cpp b/be/src/vec/exec/format/orc/orc_file_reader.cpp index 6f1411563e7..72c9cce6215 100644 --- a/be/src/vec/exec/format/orc/orc_file_reader.cpp +++ b/be/src/vec/exec/format/orc/orc_file_reader.cpp @@ -18,6 +18,7 @@ #include "vec/exec/format/orc/orc_file_reader.h" #include "util/runtime_profile.h" +#include "vec/common/custom_allocator.h" namespace doris { namespace vectorized { @@ -60,7 +61,7 @@ Status OrcMergeRangeFileReader::read_at_impl(size_t offset, Slice result, size_t if (_cache == nullptr) { auto range_size = _range.end_offset - _range.start_offset; - _cache = std::make_unique<char[]>(range_size); + _cache = make_unique_buffer<char>(range_size); { SCOPED_RAW_TIMER(&_statistics.read_time); diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h b/be/src/vec/exec/format/orc/orc_file_reader.h index d9d90f3e6e4..503777e67c2 100644 --- a/be/src/vec/exec/format/orc/orc_file_reader.h +++ b/be/src/vec/exec/format/orc/orc_file_reader.h @@ -19,6 +19,7 @@ #include "io/fs/buffered_reader.h" #include "io/fs/file_reader.h" +#include "vec/common/custom_allocator.h" namespace doris { namespace vectorized { @@ -75,7 +76,7 @@ private: io::FileReaderSPtr _inner_reader; io::PrefetchRange _range; - std::unique_ptr<char[]> _cache; + DorisUniqueBufferPtr<char> _cache; int64_t _current_start_offset = -1; size_t _size; diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp index 1693d5fab68..49ab5cd584b 100644 --- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp @@ -29,7 +29,7 @@ namespace doris::vectorized { #include "common/compile_check_begin.h" -Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, +Status ByteArrayDictDecoder::set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length, size_t num_values) { _dict = std::move(dict); if (_dict == nullptr) { diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h index 3a87801ac4b..762a9c5b885 100644 --- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h +++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h @@ -49,7 +49,8 @@ public: Status _decode_values(MutableColumnPtr& doris_column, DataTypePtr& data_type, ColumnSelectVector& select_vector, bool is_dict_filter); - Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override; + Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length, + size_t num_values) override; Status read_dict_values_to_column(MutableColumnPtr& doris_column) override; diff --git a/be/src/vec/exec/format/parquet/decoder.h b/be/src/vec/exec/format/parquet/decoder.h index 5b0098324c2..81f328ded43 100644 --- a/be/src/vec/exec/format/parquet/decoder.h +++ b/be/src/vec/exec/format/parquet/decoder.h @@ -33,6 +33,7 @@ #include "vec/columns/column_dictionary.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/common/custom_allocator.h" #include "vec/common/pod_array_fwd.h" #include "vec/core/types.h" #include "vec/data_types/data_type.h" @@ -70,7 +71,8 @@ public: virtual Status skip_values(size_t num_values) = 0; - virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) { + virtual Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length, + size_t num_values) { return Status::NotSupported("set_dict is not supported"); } @@ -151,7 +153,7 @@ protected: } // For dictionary encoding - std::unique_ptr<uint8_t[]> _dict; + DorisUniqueBufferPtr<uint8_t> _dict; std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder; std::vector<uint32_t> _indexes; }; diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp index 3b9966e9f36..b997c10da51 100644 --- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp +++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp @@ -103,7 +103,8 @@ protected: return Status::OK(); } - Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t num_values) override { + Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length, + size_t num_values) override { if (num_values * _type_length != length) { return Status::Corruption("Wrong dictionary data for fixed length type"); } diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h b/be/src/vec/exec/format/parquet/parquet_thrift_util.h index 346bbe60c02..658a32416b7 100644 --- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h +++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h @@ -28,6 +28,7 @@ #include "olap/iterators.h" #include "util/coding.h" #include "util/thrift_util.h" +#include "vec/common/custom_allocator.h" #include "vparquet_file_metadata.h" namespace doris::vectorized { @@ -62,10 +63,10 @@ static Status parse_thrift_footer(io::FileReaderSPtr file, return Status::Corruption("Parquet footer size({}) is large than file size({})", metadata_size, file_size); } - std::unique_ptr<uint8_t[]> new_buff; + DorisUniqueBufferPtr<uint8_t> new_buff; uint8_t* meta_ptr; if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) { - new_buff.reset(new uint8_t[metadata_size]); + new_buff = make_unique_buffer<uint8_t>(metadata_size); RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - metadata_size, Slice(new_buff.get(), metadata_size), &bytes_read, io_ctx)); meta_ptr = new_buff.get(); diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp index 825e0092f60..5bcf4abf919 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp @@ -21,6 +21,7 @@ #include <glog/logging.h> #include <string.h> +#include <cstdint> #include <utility> #include "common/compiler_util.h" // IWYU pragma: keep @@ -28,6 +29,7 @@ #include "util/block_compression.h" #include "util/runtime_profile.h" #include "vec/columns/column.h" +#include "vec/common/custom_allocator.h" #include "vec/exec/format/parquet/decoder.h" #include "vec/exec/format/parquet/level_decoder.h" #include "vec/exec/format/parquet/schema_desc.h" @@ -232,7 +234,7 @@ Status ColumnChunkReader::_decode_dict_page() { // Prepare dictionary data int32_t uncompressed_size = header->uncompressed_page_size; - std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]); + auto dict_data = make_unique_buffer<uint8_t>(uncompressed_size); if (_block_compress_codec != nullptr) { Slice compressed_data; RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data)); @@ -265,7 +267,7 @@ Status ColumnChunkReader::_decode_dict_page() { void ColumnChunkReader::_reserve_decompress_buf(size_t size) { if (size > _decompress_buf_size) { _decompress_buf_size = BitUtil::next_power_of_two(size); - _decompress_buf.reset(new uint8_t[_decompress_buf_size]); + _decompress_buf = make_unique_buffer<uint8_t>(_decompress_buf_size); } } diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h index e52973b732e..db0530da597 100644 --- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h @@ -215,7 +215,7 @@ private: size_t _chunk_parsed_values = 0; uint32_t _remaining_num_values = 0; Slice _page_data; - std::unique_ptr<uint8_t[]> _decompress_buf; + DorisUniqueBufferPtr<uint8_t> _decompress_buf; size_t _decompress_buf_size = 0; Slice _v2_rep_levels; Slice _v2_def_levels; diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp index 7d14a87ea1d..96feeceea8d 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp @@ -44,6 +44,7 @@ #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" #include "vec/common/assert_cast.h" +#include "vec/common/custom_allocator.h" #include "vec/common/pod_array.h" #include "vec/core/block.h" #include "vec/core/column_with_type_and_name.h" @@ -565,7 +566,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re } FilterMap& filter_map = *filter_map_ptr; - std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr; + DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr; if (_cached_filtered_rows != 0) { RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, pre_read_rows)); pre_read_rows += _cached_filtered_rows; @@ -625,7 +626,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t batch_size, size_t* re } Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map, - std::unique_ptr<uint8_t[]>& filter_map_data, + DorisUniqueBufferPtr<uint8_t>& filter_map_data, size_t pre_read_rows) const { if (_cached_filtered_rows == 0) { return Status::OK(); @@ -636,8 +637,8 @@ Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map, return Status::OK(); } - uint8_t* map = new uint8_t[total_rows]; - filter_map_data.reset(map); + filter_map_data = make_unique_buffer<uint8_t>(total_rows); + auto* map = filter_map_data.get(); for (size_t i = 0; i < _cached_filtered_rows; ++i) { map[i] = 0; } diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h b/be/src/vec/exec/format/parquet/vparquet_group_reader.h index 002163bec9a..9d0a59a4e91 100644 --- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h +++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h @@ -192,7 +192,8 @@ private: size_t batch_size, size_t* read_rows, bool* batch_eof, FilterMap& filter_map); Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, bool* batch_eof); - Status _rebuild_filter_map(FilterMap& filter_map, std::unique_ptr<uint8_t[]>& filter_map_data, + Status _rebuild_filter_map(FilterMap& filter_map, + DorisUniqueBufferPtr<uint8_t>& filter_map_data, size_t pre_read_rows) const; Status _fill_partition_columns( Block* block, size_t rows, diff --git a/be/src/vec/functions/function_jsonb.cpp b/be/src/vec/functions/function_jsonb.cpp index fc183a9d20b..956612f1d69 100644 --- a/be/src/vec/functions/function_jsonb.cpp +++ b/be/src/vec/functions/function_jsonb.cpp @@ -2008,7 +2008,7 @@ public: JsonbWriter writer; struct DocumentBuffer { - std::unique_ptr<char[]> ptr; + DorisUniqueBufferPtr<char> ptr; size_t size = 0; size_t capacity = 0; }; @@ -2100,7 +2100,7 @@ public: if (writer_output->getSize() > tmp_buffer.capacity) { tmp_buffer.capacity = ((size_t(writer_output->getSize()) + 1024 - 1) / 1024) * 1024; - tmp_buffer.ptr = std::make_unique<char[]>(tmp_buffer.capacity); + tmp_buffer.ptr = make_unique_buffer<char>(tmp_buffer.capacity); DCHECK_LE(writer_output->getSize(), tmp_buffer.capacity); } diff --git a/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp b/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp index a6450e253c0..5115fdf8b74 100644 --- a/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp +++ b/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp @@ -26,6 +26,7 @@ #include "vec/columns/column_dictionary.h" #include "vec/columns/column_string.h" #include "vec/columns/column_vector.h" +#include "vec/common/custom_allocator.h" #include "vec/data_types/data_type_string.h" namespace doris::vectorized { @@ -43,7 +44,7 @@ protected: dict_data_size += 4 + strlen(values[i]); // 4 bytes for length + string data } - auto dict_data = std::make_unique<uint8_t[]>(dict_data_size); + auto dict_data = make_unique_buffer<uint8_t>(dict_data_size); size_t offset = 0; for (int i = 0; i < 3; i++) { uint32_t len = strlen(values[i]); @@ -180,7 +181,7 @@ TEST_F(ByteArrayDictDecoderTest, test_decode_with_filter_and_null) { // Test empty dictionary case TEST_F(ByteArrayDictDecoderTest, test_empty_dict) { ByteArrayDictDecoder empty_decoder; - auto dict_data = std::make_unique<uint8_t[]>(0); + auto dict_data = make_unique_buffer<uint8_t>(0); ASSERT_TRUE(empty_decoder.set_dict(dict_data, 0, 0).ok()); } diff --git a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp index 187f9ff30dd..1a2302d8558 100644 --- a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp +++ b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp @@ -21,6 +21,7 @@ #include "util/slice.h" #include "vec/columns/column_vector.h" +#include "vec/common/custom_allocator.h" #include "vec/data_types/data_type_number.h" namespace doris::vectorized { @@ -33,7 +34,7 @@ protected: size_t dict_size = 3; size_t dict_data_size = dict_size * _type_length; - auto dict_data = std::make_unique<uint8_t[]>(dict_data_size); + auto dict_data = make_unique_buffer<uint8_t>(dict_data_size); const char* values[3] = {"apple ", "banana", "cherry"}; // Dictionary values for (int i = 0; i < 3; i++) { memcpy(dict_data.get() + i * _type_length, values[i], _type_length); @@ -202,7 +203,7 @@ TEST_F(FixLengthDictDecoderTest, test_empty_dict) { FixLengthDictDecoder empty_decoder; empty_decoder.set_type_length(sizeof(int32_t)); - auto dict_data = std::make_unique<uint8_t[]>(0); + auto dict_data = make_unique_buffer<uint8_t>(0); ASSERT_TRUE(empty_decoder.set_dict(dict_data, 0, 0).ok()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
