This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 270c7ddc90a [opt](be) Use a custom allocator to allocate memory in
order to avoid untracked memory usage (#55749)
270c7ddc90a is described below
commit 270c7ddc90a3e6c29ef08676d59ed6b7c592dcb9
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Sep 10 11:43:30 2025 +0800
[opt](be) Use a custom allocator to allocate memory in order to avoid
untracked memory usage (#55749)
---
be/src/io/fs/broker_file_system.cpp | 6 +-
be/src/io/fs/buffered_reader.cpp | 4 +-
be/src/io/fs/buffered_reader.h | 3 +-
be/src/io/fs/stream_load_pipe.cpp | 9 ++-
be/src/io/fs/stream_load_pipe.h | 16 ++--
be/src/olap/tablet.cpp | 3 +-
be/src/util/faststring.cc | 6 +-
be/src/util/slice.h | 2 +-
be/src/vec/common/allocator.cpp | 2 +-
be/src/vec/common/allocator.h | 2 +-
be/src/vec/common/custom_allocator.h | 74 ++++++++++++++++++
.../file_reader/new_plain_binary_line_reader.h | 3 +-
be/src/vec/exec/format/json/new_json_reader.cpp | 14 ++--
be/src/vec/exec/format/json/new_json_reader.h | 7 +-
be/src/vec/exec/format/orc/orc_file_reader.cpp | 3 +-
be/src/vec/exec/format/orc/orc_file_reader.h | 3 +-
.../format/parquet/byte_array_dict_decoder.cpp | 2 +-
.../exec/format/parquet/byte_array_dict_decoder.h | 3 +-
be/src/vec/exec/format/parquet/decoder.h | 6 +-
.../format/parquet/fix_length_dict_decoder.hpp | 3 +-
.../vec/exec/format/parquet/parquet_thrift_util.h | 5 +-
.../parquet/vparquet_column_chunk_reader.cpp | 6 +-
.../format/parquet/vparquet_column_chunk_reader.h | 2 +-
.../exec/format/parquet/vparquet_group_reader.cpp | 9 ++-
.../exec/format/parquet/vparquet_group_reader.h | 3 +-
be/src/vec/functions/function_jsonb.cpp | 4 +-
be/test/vec/common/custom_allocator_test.cpp | 90 ++++++++++++++++++++++
.../parquet/byte_array_dict_decoder_test.cpp | 5 +-
.../parquet/fix_length_dict_decoder_test.cpp | 5 +-
29 files changed, 242 insertions(+), 58 deletions(-)
diff --git a/be/src/io/fs/broker_file_system.cpp
b/be/src/io/fs/broker_file_system.cpp
index 0d6d0d3555a..8b0d5db23e2 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -22,11 +22,10 @@
#include <gen_cpp/TPaloBrokerService.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
-#include <stddef.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransportException.h>
-#include <algorithm>
+#include <cstddef>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <filesystem>
@@ -45,6 +44,7 @@
#include "runtime/broker_mgr.h"
#include "runtime/exec_env.h"
#include "util/slice.h"
+#include "vec/common/custom_allocator.h"
namespace doris::io {
@@ -405,7 +405,7 @@ Status BrokerFileSystem::download_impl(const Path&
remote_file, const Path& loca
// 4. read remote and write to local
VLOG(2) << "read remote file: " << remote_file << " to local: " <<
local_file;
constexpr size_t buf_sz = 1024 * 1024;
- std::unique_ptr<uint8_t[]> read_buf(new uint8_t[buf_sz]);
+ auto read_buf = make_unique_buffer<uint8_t>(buf_sz);
size_t cur_offset = 0;
while (true) {
size_t read_len = 0;
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 488e235af12..a22b08c0659 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -23,6 +23,7 @@
#include <algorithm>
#include <chrono>
+#include <cstdint>
#include <memory>
#include "common/cast_set.h"
@@ -35,6 +36,7 @@
#include "util/runtime_profile.h"
#include "util/slice.h"
#include "util/threadpool.h"
+#include "vec/common/custom_allocator.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -800,7 +802,7 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t**
buf, uint64_t offset
}
size_t buf_size = std::max(_max_buf_size, bytes_to_read);
if (_buf_size < buf_size) {
- std::unique_ptr<uint8_t[]> new_buf(new uint8_t[buf_size]);
+ auto new_buf = make_unique_buffer<uint8_t>(buf_size);
if (offset >= _buf_start_offset && offset < _buf_end_offset) {
memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset,
_buf_end_offset - offset);
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index f298a3fe166..394c7eb9ad4 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -37,6 +37,7 @@
#include "olap/olap_define.h"
#include "util/runtime_profile.h"
#include "util/slice.h"
+#include "vec/common/custom_allocator.h"
#include "vec/common/typeid_cast.h"
namespace doris {
@@ -655,7 +656,7 @@ protected:
}
private:
- std::unique_ptr<uint8_t[]> _buf;
+ DorisUniqueBufferPtr<uint8_t> _buf;
io::FileReaderSPtr _file;
uint64_t _file_start_offset;
uint64_t _file_end_offset;
diff --git a/be/src/io/fs/stream_load_pipe.cpp
b/be/src/io/fs/stream_load_pipe.cpp
index 4b204a2aace..d25eb6a839c 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -28,6 +28,7 @@
#include "runtime/exec_env.h"
#include "runtime/thread_context.h"
#include "util/bit_util.h"
+#include "vec/common/custom_allocator.h"
namespace doris {
namespace io {
@@ -90,7 +91,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice
result, size_t* byt
// If _total_length == -1, this should be a Kafka routine load task or stream
load with chunked transfer HTTP request,
// just get the next buffer directly from the buffer queue, because one buffer
contains a complete piece of data.
// Otherwise, this should be a stream load task that needs to read the
specified amount of data.
-Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data,
size_t* length) {
+Status StreamLoadPipe::read_one_message(DorisUniqueBufferPtr<uint8_t>* data,
size_t* length) {
if (_total_length < -1) {
return Status::InternalError("invalid, _total_length is: {}",
_total_length);
} else if (_total_length == 0) {
@@ -104,7 +105,7 @@ Status
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}
// _total_length > 0, read the entire data
- data->reset(new uint8_t[_total_length]);
+ *data = make_unique_buffer<uint8_t>(_total_length);
Slice result(data->get(), _total_length);
Status st = read_at(0, result, length);
return st;
@@ -163,7 +164,7 @@ Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
}
// read the next buffer from _buf_queue
-Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data,
size_t* length) {
+Status StreamLoadPipe::_read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data,
size_t* length) {
std::unique_lock<std::mutex> l(_lock);
while (!_cancelled && !_finished && _buf_queue.empty()) {
_get_cond.wait(l);
@@ -181,7 +182,7 @@ Status
StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_
}
auto buf = _buf_queue.front();
*length = buf->remaining();
- data->reset(new uint8_t[*length]);
+ *data = make_unique_buffer<uint8_t>(*length);
buf->get_bytes((char*)(data->get()), *length);
_buf_queue.pop_front();
_buffered_bytes -= buf->limit;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 4731c620142..cedab0b6c17 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -18,10 +18,10 @@
#pragma once
#include <gen_cpp/internal_service.pb.h>
-#include <stddef.h>
-#include <stdint.h>
#include <condition_variable>
+#include <cstddef>
+#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
@@ -29,14 +29,13 @@
#include "common/status.h"
#include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
#include "io/fs/path.h"
#include "runtime/message_body_sink.h"
#include "util/byte_buffer.h"
#include "util/slice.h"
+#include "vec/common/custom_allocator.h"
-namespace doris {
-namespace io {
+namespace doris::io {
struct IOContext;
static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
@@ -74,7 +73,7 @@ public:
// called when producer/consumer failed
virtual void cancel(const std::string& reason) override;
- Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length);
+ Status read_one_message(DorisUniqueBufferPtr<uint8_t>* data, size_t*
length);
size_t get_queue_size() { return _buf_queue.size(); }
@@ -97,7 +96,7 @@ protected:
private:
// read the next buffer from _buf_queue
- Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length);
+ Status _read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, size_t*
length);
Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0);
@@ -130,5 +129,4 @@ private:
// the data needs to be completely read before it can be parsed.
bool _is_chunked_transfer = false;
};
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 758e1168274..6ef58fa9e99 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -121,6 +121,7 @@
#include "util/work_thread_pool.hpp"
#include "vec/columns/column.h"
#include "vec/columns/column_string.h"
+#include "vec/common/custom_allocator.h"
#include "vec/common/schema_util.h"
#include "vec/common/string_ref.h"
#include "vec/data_types/data_type.h"
@@ -2184,7 +2185,7 @@ Status Tablet::_read_cooldown_meta(const StorageResource&
storage_resource,
RETURN_IF_ERROR(storage_resource.fs->open_file(remote_meta_path,
&tablet_meta_reader));
auto file_size = tablet_meta_reader->size();
size_t bytes_read;
- auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[file_size]);
+ auto buf = make_unique_buffer<uint8_t>(file_size);
RETURN_IF_ERROR(tablet_meta_reader->read_at(0, {buf.get(), file_size},
&bytes_read));
RETURN_IF_ERROR(tablet_meta_reader->close());
if (!tablet_meta_pb->ParseFromArray(buf.get(), cast_set<int>(file_size))) {
diff --git a/be/src/util/faststring.cc b/be/src/util/faststring.cc
index 046e08c29d8..f9ba31fb523 100644
--- a/be/src/util/faststring.cc
+++ b/be/src/util/faststring.cc
@@ -19,7 +19,7 @@
#include <glog/logging.h>
-#include <memory>
+#include "vec/common/custom_allocator.h"
namespace doris {
@@ -37,7 +37,7 @@ void faststring::GrowToAtLeast(size_t newcapacity) {
void faststring::GrowArray(size_t newcapacity) {
DCHECK_GE(newcapacity, capacity_);
- std::unique_ptr<uint8_t[]>
newdata(reinterpret_cast<uint8_t*>(Allocator::alloc(newcapacity)));
+ DorisUniqueBufferPtr<uint8_t> newdata(newcapacity);
if (len_ > 0) {
memcpy(&newdata[0], &data_[0], len_);
}
@@ -62,7 +62,7 @@ void faststring::ShrinkToFitInternal() {
data_ = initial_data_;
capacity_ = kInitialCapacity;
} else {
- std::unique_ptr<uint8_t[]>
newdata(reinterpret_cast<uint8_t*>(Allocator::alloc(len_)));
+ DorisUniqueBufferPtr<uint8_t> newdata(len_);
memcpy(&newdata[0], &data_[0], len_);
Allocator::free(data_, capacity_);
data_ = newdata.release();
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index 3aa9a062153..c8b1629b8bd 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -311,7 +311,7 @@ inline int Slice::compare(const Slice& b) const {
}
// A move-only type which manage the lifecycle of externally allocated data.
-// Unlike std::unique_ptr<uint8_t[]>, OwnedSlice remembers the size of data so
that clients can access
+// Unlike DorisUniqueBufferPtr<uint8_t>, OwnedSlice remembers the size of data
so that clients can access
// the underlying buffer as a Slice.
//
// Usage example:
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 50527cbe32e..21db641335b 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -373,7 +373,7 @@ void* Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator,
template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator,
bool check_and_tracking_memory>
void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator,
- check_and_tracking_memory>::free(void* buf, size_t size) {
+ check_and_tracking_memory>::free(void* buf, size_t size) const {
if (use_mmap && size >= doris::config::mmap_threshold) {
if (0 != munmap(buf, size)) {
throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size));
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2a288c65c82..e7ede2adfa9 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -232,7 +232,7 @@ public:
void* realloc(void* buf, size_t old_size, size_t new_size, size_t
alignment = 0);
// Free memory range.
- void free(void* buf, size_t size);
+ void free(void* buf, size_t size) const;
void release_unused() { MemoryAllocator::release_unused(); }
diff --git a/be/src/vec/common/custom_allocator.h
b/be/src/vec/common/custom_allocator.h
index 0e2cea3b9e7..b37af3e53c5 100644
--- a/be/src/vec/common/custom_allocator.h
+++ b/be/src/vec/common/custom_allocator.h
@@ -17,6 +17,9 @@
#pragma once
+#include <cstddef>
+#include <type_traits>
+
#include "vec/common/allocator.h"
#include "vec/common/allocator_fwd.h"
@@ -33,6 +36,77 @@ template <class Key, class T, class Compare = std::less<Key>,
class Allocator = CustomStdAllocator<std::pair<const Key, T>>>
using DorisMap = std::map<Key, T, Compare, Allocator>;
+template <typename T>
+ requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>)
+struct DorisUniqueBufferDeleter : public Allocator<false> {
+ size_t count = 0;
+
+ DorisUniqueBufferDeleter() = default;
+ DorisUniqueBufferDeleter(size_t n) : count(n) {}
+
+ void operator()(T* ptr) const noexcept {
+ if (ptr) {
+ Allocator::free(ptr, count * sizeof(T));
+ }
+ }
+};
+
+template <typename T>
+ requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>)
+class DorisUniqueBufferPtr {
+public:
+ using Deleter = DorisUniqueBufferDeleter<T>;
+
+ DorisUniqueBufferPtr() = default;
+ explicit DorisUniqueBufferPtr(T* ptr) = delete;
+
+ DorisUniqueBufferPtr(size_t size) {
+ DorisUniqueBufferDeleter<T> deleter(size);
+ void* buf = deleter.alloc(size * sizeof(T));
+ if (!buf) {
+ return;
+ }
+ T* arr = static_cast<T*>(buf);
+ ptr_ = std::unique_ptr<T[], DorisUniqueBufferDeleter<T>>(arr,
std::move(deleter));
+ }
+
+ DorisUniqueBufferPtr(std::unique_ptr<T[], Deleter>&& uptr) :
ptr_(std::move(uptr)) {}
+ DorisUniqueBufferPtr(DorisUniqueBufferPtr&& other) :
ptr_(std::move(other.ptr_)) {}
+
+ DorisUniqueBufferPtr& operator=(DorisUniqueBufferPtr&& other) {
+ if (this != &other) {
+ ptr_ = std::move(other.ptr_);
+ }
+ return *this;
+ }
+
+ DorisUniqueBufferPtr(std::nullptr_t) noexcept {}
+
+ // Delete this function to avoid passing in a pointer allocated by other
means(new/malloc).
+ void reset(T*) = delete;
+
+ bool operator==(std::nullptr_t) const noexcept { return ptr_ == nullptr; }
+ bool operator==(T* other) const noexcept { return ptr_.get() == other; }
+
+ void reset() noexcept { ptr_.reset(); }
+
+ auto release() noexcept { return ptr_.release(); }
+
+ T* get() const noexcept { return ptr_.get(); }
+ T& operator*() const noexcept { return *ptr_; }
+ T* operator->() const noexcept { return ptr_.get(); }
+ T& operator[](size_t i) const { return ptr_[i]; }
+
+private:
+ std::unique_ptr<T[], Deleter> ptr_;
+};
+
+template <typename T>
+ requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>)
+DorisUniqueBufferPtr<T> make_unique_buffer(size_t n) {
+ return DorisUniqueBufferPtr<T>(n);
+}
+
// NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory
tracker,but it's still stateless,
// because threadcontext owns the memtracker, not CustomStdAllocator.
template <class T, typename MemoryAllocator>
diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
index e81afb162c3..02a948dc839 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
@@ -24,6 +24,7 @@
#include "common/status.h"
#include "exec/line_reader.h"
#include "io/fs/file_reader_writer_fwd.h"
+#include "vec/common/custom_allocator.h"
namespace doris {
#include "common/compile_check_begin.h"
@@ -51,7 +52,7 @@ public:
private:
io::FileReaderSPtr _file_reader;
- std::unique_ptr<uint8_t[]> _file_buf;
+ DorisUniqueBufferPtr<uint8_t> _file_buf;
std::unique_ptr<PDataRow> _cur_row;
};
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp
b/be/src/vec/exec/format/json/new_json_reader.cpp
index e49100f82a1..e75bff2c96f 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -58,6 +58,7 @@
#include "vec/columns/column_string.h"
#include "vec/columns/column_struct.h"
#include "vec/common/assert_cast.h"
+#include "vec/common/custom_allocator.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_array.h"
#include "vec/data_types/data_type_factory.hpp"
@@ -244,7 +245,7 @@ Status
NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
std::vector<DataTypePtr>* col_types) {
bool eof = false;
const uint8_t* json_str = nullptr;
- std::unique_ptr<uint8_t[]> json_str_ptr;
+ DorisUniqueBufferPtr<uint8_t> json_str_ptr;
size_t size = 0;
if (_line_reader != nullptr) {
RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof,
_io_ctx));
@@ -474,7 +475,8 @@ Status NewJsonReader::_read_json_column(RuntimeState*
state, Block& block,
return (this->*_vhandle_json_callback)(state, block, slot_descs,
is_empty_row, eof);
}
-Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf,
size_t* read_size) {
+Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>*
file_buf,
+ size_t* read_size) {
switch (_params.file_type) {
case TFileType::FILE_LOCAL:
[[fallthrough]];
@@ -482,7 +484,7 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
[[fallthrough]];
case TFileType::FILE_S3: {
size_t file_size = _file_reader->size();
- file_buf->reset(new uint8_t[file_size]);
+ *file_buf = make_unique_buffer<uint8_t>(file_size);
Slice result(file_buf->get(), file_size);
RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result,
read_size, _io_ctx));
_current_offset += *read_size;
@@ -499,7 +501,7 @@ Status
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
return Status::OK();
}
-Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>*
file_buf,
+Status
NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>*
file_buf,
size_t* read_size) {
auto* stream_load_pipe =
dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
@@ -515,7 +517,7 @@ Status
NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi
uint64_t cur_size = 0;
// second read: continuously read data from the pipe until all data is
read.
- std::unique_ptr<uint8_t[]> read_buf;
+ DorisUniqueBufferPtr<uint8_t> read_buf;
size_t read_buf_size = 0;
while (true) {
RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf,
&read_buf_size));
@@ -534,7 +536,7 @@ Status
NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi
return Status::OK();
}
- std::unique_ptr<uint8_t[]> total_buf =
std::make_unique<uint8_t[]>(cur_size + *read_size);
+ DorisUniqueBufferPtr<uint8_t> total_buf =
make_unique_buffer<uint8_t>(cur_size + *read_size);
// copy the data during the first read
memcpy(total_buf.get(), file_buf->get(), *read_size);
diff --git a/be/src/vec/exec/format/json/new_json_reader.h
b/be/src/vec/exec/format/json/new_json_reader.h
index f537f899eb3..283b1c6251e 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -37,6 +37,7 @@
#include "io/file_factory.h"
#include "io/fs/file_reader_writer_fwd.h"
#include "util/runtime_profile.h"
+#include "vec/common/custom_allocator.h"
#include "vec/common/string_ref.h"
#include "vec/core/types.h"
#include "vec/exec/format/generic_reader.h"
@@ -102,11 +103,11 @@ private:
const std::vector<SlotDescriptor*>& slot_descs,
bool* is_empty_row,
bool* eof);
- Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t*
read_size);
+ Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t*
read_size);
// StreamLoadPipe::read_one_message only reads a portion of the data when
stream loading with a chunked transfer HTTP request.
// Need to read all the data before performing JSON parsing.
- Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf,
size_t* read_size);
+ Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>*
file_buf, size_t* read_size);
// simdjson, replace none simdjson function if it is ready
Status _simdjson_init_reader();
@@ -239,7 +240,7 @@ private:
/// Set of columns which already met in row. Exception is thrown if there
are more than one column with the same name.
std::vector<UInt8> _seen_columns;
// simdjson
- std::unique_ptr<uint8_t[]> _json_str_ptr;
+ DorisUniqueBufferPtr<uint8_t> _json_str_ptr;
const uint8_t* _json_str = nullptr;
static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.cpp
b/be/src/vec/exec/format/orc/orc_file_reader.cpp
index 6f1411563e7..72c9cce6215 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.cpp
+++ b/be/src/vec/exec/format/orc/orc_file_reader.cpp
@@ -18,6 +18,7 @@
#include "vec/exec/format/orc/orc_file_reader.h"
#include "util/runtime_profile.h"
+#include "vec/common/custom_allocator.h"
namespace doris {
namespace vectorized {
@@ -60,7 +61,7 @@ Status OrcMergeRangeFileReader::read_at_impl(size_t offset,
Slice result, size_t
if (_cache == nullptr) {
auto range_size = _range.end_offset - _range.start_offset;
- _cache = std::make_unique<char[]>(range_size);
+ _cache = make_unique_buffer<char>(range_size);
{
SCOPED_RAW_TIMER(&_statistics.read_time);
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h
b/be/src/vec/exec/format/orc/orc_file_reader.h
index d9d90f3e6e4..503777e67c2 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.h
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -19,6 +19,7 @@
#include "io/fs/buffered_reader.h"
#include "io/fs/file_reader.h"
+#include "vec/common/custom_allocator.h"
namespace doris {
namespace vectorized {
@@ -75,7 +76,7 @@ private:
io::FileReaderSPtr _inner_reader;
io::PrefetchRange _range;
- std::unique_ptr<char[]> _cache;
+ DorisUniqueBufferPtr<char> _cache;
int64_t _current_start_offset = -1;
size_t _size;
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
index 1693d5fab68..49ab5cd584b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
@@ -29,7 +29,7 @@
namespace doris::vectorized {
#include "common/compile_check_begin.h"
-Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict,
int32_t length,
+Status ByteArrayDictDecoder::set_dict(DorisUniqueBufferPtr<uint8_t>& dict,
int32_t length,
size_t num_values) {
_dict = std::move(dict);
if (_dict == nullptr) {
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
index 3a87801ac4b..762a9c5b885 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
@@ -49,7 +49,8 @@ public:
Status _decode_values(MutableColumnPtr& doris_column, DataTypePtr&
data_type,
ColumnSelectVector& select_vector, bool
is_dict_filter);
- Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t
num_values) override;
+ Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length,
+ size_t num_values) override;
Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
diff --git a/be/src/vec/exec/format/parquet/decoder.h
b/be/src/vec/exec/format/parquet/decoder.h
index 5b0098324c2..81f328ded43 100644
--- a/be/src/vec/exec/format/parquet/decoder.h
+++ b/be/src/vec/exec/format/parquet/decoder.h
@@ -33,6 +33,7 @@
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
+#include "vec/common/custom_allocator.h"
#include "vec/common/pod_array_fwd.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
@@ -70,7 +71,8 @@ public:
virtual Status skip_values(size_t num_values) = 0;
- virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length,
size_t num_values) {
+ virtual Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t
length,
+ size_t num_values) {
return Status::NotSupported("set_dict is not supported");
}
@@ -151,7 +153,7 @@ protected:
}
// For dictionary encoding
- std::unique_ptr<uint8_t[]> _dict;
+ DorisUniqueBufferPtr<uint8_t> _dict;
std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder;
std::vector<uint32_t> _indexes;
};
diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
index 3b9966e9f36..b997c10da51 100644
--- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
@@ -103,7 +103,8 @@ protected:
return Status::OK();
}
- Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t
num_values) override {
+ Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length,
+ size_t num_values) override {
if (num_values * _type_length != length) {
return Status::Corruption("Wrong dictionary data for fixed length
type");
}
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 346bbe60c02..658a32416b7 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -28,6 +28,7 @@
#include "olap/iterators.h"
#include "util/coding.h"
#include "util/thrift_util.h"
+#include "vec/common/custom_allocator.h"
#include "vparquet_file_metadata.h"
namespace doris::vectorized {
@@ -62,10 +63,10 @@ static Status parse_thrift_footer(io::FileReaderSPtr file,
return Status::Corruption("Parquet footer size({}) is large than file
size({})",
metadata_size, file_size);
}
- std::unique_ptr<uint8_t[]> new_buff;
+ DorisUniqueBufferPtr<uint8_t> new_buff;
uint8_t* meta_ptr;
if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) {
- new_buff.reset(new uint8_t[metadata_size]);
+ new_buff = make_unique_buffer<uint8_t>(metadata_size);
RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE -
metadata_size,
Slice(new_buff.get(), metadata_size),
&bytes_read, io_ctx));
meta_ptr = new_buff.get();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 825e0092f60..5bcf4abf919 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -21,6 +21,7 @@
#include <glog/logging.h>
#include <string.h>
+#include <cstdint>
#include <utility>
#include "common/compiler_util.h" // IWYU pragma: keep
@@ -28,6 +29,7 @@
#include "util/block_compression.h"
#include "util/runtime_profile.h"
#include "vec/columns/column.h"
+#include "vec/common/custom_allocator.h"
#include "vec/exec/format/parquet/decoder.h"
#include "vec/exec/format/parquet/level_decoder.h"
#include "vec/exec/format/parquet/schema_desc.h"
@@ -232,7 +234,7 @@ Status ColumnChunkReader::_decode_dict_page() {
// Prepare dictionary data
int32_t uncompressed_size = header->uncompressed_page_size;
- std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]);
+ auto dict_data = make_unique_buffer<uint8_t>(uncompressed_size);
if (_block_compress_codec != nullptr) {
Slice compressed_data;
RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
@@ -265,7 +267,7 @@ Status ColumnChunkReader::_decode_dict_page() {
void ColumnChunkReader::_reserve_decompress_buf(size_t size) {
if (size > _decompress_buf_size) {
_decompress_buf_size = BitUtil::next_power_of_two(size);
- _decompress_buf.reset(new uint8_t[_decompress_buf_size]);
+ _decompress_buf = make_unique_buffer<uint8_t>(_decompress_buf_size);
}
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index e52973b732e..db0530da597 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -215,7 +215,7 @@ private:
size_t _chunk_parsed_values = 0;
uint32_t _remaining_num_values = 0;
Slice _page_data;
- std::unique_ptr<uint8_t[]> _decompress_buf;
+ DorisUniqueBufferPtr<uint8_t> _decompress_buf;
size_t _decompress_buf_size = 0;
Slice _v2_rep_levels;
Slice _v2_def_levels;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 7d14a87ea1d..96feeceea8d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -44,6 +44,7 @@
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
#include "vec/common/assert_cast.h"
+#include "vec/common/custom_allocator.h"
#include "vec/common/pod_array.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
@@ -565,7 +566,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
}
FilterMap& filter_map = *filter_map_ptr;
- std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
+ DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr;
if (_cached_filtered_rows != 0) {
RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map,
pre_read_rows));
pre_read_rows += _cached_filtered_rows;
@@ -625,7 +626,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t
batch_size, size_t* re
}
Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
- std::unique_ptr<uint8_t[]>&
filter_map_data,
+ DorisUniqueBufferPtr<uint8_t>&
filter_map_data,
size_t pre_read_rows) const {
if (_cached_filtered_rows == 0) {
return Status::OK();
@@ -636,8 +637,8 @@ Status RowGroupReader::_rebuild_filter_map(FilterMap&
filter_map,
return Status::OK();
}
- uint8_t* map = new uint8_t[total_rows];
- filter_map_data.reset(map);
+ filter_map_data = make_unique_buffer<uint8_t>(total_rows);
+ auto* map = filter_map_data.get();
for (size_t i = 0; i < _cached_filtered_rows; ++i) {
map[i] = 0;
}
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 002163bec9a..9d0a59a4e91 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -192,7 +192,8 @@ private:
size_t batch_size, size_t* read_rows, bool*
batch_eof,
FilterMap& filter_map);
Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows,
bool* batch_eof);
- Status _rebuild_filter_map(FilterMap& filter_map,
std::unique_ptr<uint8_t[]>& filter_map_data,
+ Status _rebuild_filter_map(FilterMap& filter_map,
+ DorisUniqueBufferPtr<uint8_t>& filter_map_data,
size_t pre_read_rows) const;
Status _fill_partition_columns(
Block* block, size_t rows,
diff --git a/be/src/vec/functions/function_jsonb.cpp
b/be/src/vec/functions/function_jsonb.cpp
index 89d5274db92..dfc0101c475 100644
--- a/be/src/vec/functions/function_jsonb.cpp
+++ b/be/src/vec/functions/function_jsonb.cpp
@@ -2007,7 +2007,7 @@ public:
JsonbWriter writer;
struct DocumentBuffer {
- std::unique_ptr<char[]> ptr;
+ DorisUniqueBufferPtr<char> ptr;
size_t size = 0;
size_t capacity = 0;
};
@@ -2099,7 +2099,7 @@ public:
if (writer_output->getSize() > tmp_buffer.capacity) {
tmp_buffer.capacity =
((size_t(writer_output->getSize()) + 1024 - 1) /
1024) * 1024;
- tmp_buffer.ptr =
std::make_unique<char[]>(tmp_buffer.capacity);
+ tmp_buffer.ptr =
make_unique_buffer<char>(tmp_buffer.capacity);
DCHECK_LE(writer_output->getSize(), tmp_buffer.capacity);
}
diff --git a/be/test/vec/common/custom_allocator_test.cpp
b/be/test/vec/common/custom_allocator_test.cpp
new file mode 100644
index 00000000000..23adf86c667
--- /dev/null
+++ b/be/test/vec/common/custom_allocator_test.cpp
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+//
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/tests/gtest_pod_array.cpp
+// and modified by Doris
+
+#include "vec/common/custom_allocator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+namespace doris {
+TEST(CustomAllocatorTest, StdAllocatorBasic) {
+ using Alloc = CustomStdAllocator<int32_t>;
+ Alloc alloc;
+
+ int32_t* buf = alloc.allocate(10);
+ for (int i = 0; i < 10; ++i) {
+ buf[i] = i;
+ }
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_EQ(buf[i], i);
+ }
+ alloc.deallocate(buf, 10);
+}
+
+TEST(DorisUniqueBufferPtrTest, Basic) {
+ auto buf = make_unique_buffer<int32_t>(10);
+ for (int32_t i = 0; i != 10; ++i) {
+ buf[i] = i;
+ }
+ for (int32_t i = 0; i != 10; ++i) {
+ ASSERT_EQ(buf[i], i);
+ }
+
+ DorisUniqueBufferPtr<int32_t> buf2 = std::move(buf);
+ for (int32_t i = 0; i != 10; ++i) {
+ ASSERT_EQ(buf2[i], i);
+ }
+
+ ASSERT_EQ(buf, nullptr);
+
+ auto* ptr = buf2.release();
+ for (int32_t i = 0; i != 10; ++i) {
+ ASSERT_EQ(ptr[i], i);
+ }
+
+ ASSERT_EQ(buf2, nullptr);
+ Allocator<false> {}.free(ptr, 10 * sizeof(int32_t));
+
+ DorisUniqueBufferPtr<int32_t> buf3(100);
+ ASSERT_NE(buf3, nullptr);
+
+ buf3.reset();
+ ASSERT_EQ(buf3, nullptr);
+
+ DorisUniqueBufferPtr<int32_t>::Deleter deleter(50);
+ auto unique_ptr = std::unique_ptr<int32_t[],
DorisUniqueBufferPtr<int32_t>::Deleter>(
+ static_cast<int32_t*>(deleter.alloc(50 * sizeof(int32_t))),
std::move(deleter));
+
+ DorisUniqueBufferPtr<int32_t> buf4(std::move(unique_ptr));
+ ASSERT_NE(buf4, nullptr);
+
+ ptr = buf4.get();
+
+ DorisUniqueBufferPtr<int32_t> buf5(std::move(buf4));
+
+ ASSERT_EQ(ptr, buf5.get());
+ ASSERT_EQ(ptr, buf5);
+
+ DorisUniqueBufferPtr<int32_t> buf6 = nullptr;
+ ASSERT_EQ(buf6, nullptr);
+}
+
+}; // namespace doris
\ No newline at end of file
diff --git a/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
b/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
index a6450e253c0..5115fdf8b74 100644
--- a/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
+++ b/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
@@ -26,6 +26,7 @@
#include "vec/columns/column_dictionary.h"
#include "vec/columns/column_string.h"
#include "vec/columns/column_vector.h"
+#include "vec/common/custom_allocator.h"
#include "vec/data_types/data_type_string.h"
namespace doris::vectorized {
@@ -43,7 +44,7 @@ protected:
dict_data_size += 4 + strlen(values[i]); // 4 bytes for length +
string data
}
- auto dict_data = std::make_unique<uint8_t[]>(dict_data_size);
+ auto dict_data = make_unique_buffer<uint8_t>(dict_data_size);
size_t offset = 0;
for (int i = 0; i < 3; i++) {
uint32_t len = strlen(values[i]);
@@ -180,7 +181,7 @@ TEST_F(ByteArrayDictDecoderTest,
test_decode_with_filter_and_null) {
// Test empty dictionary case
TEST_F(ByteArrayDictDecoderTest, test_empty_dict) {
ByteArrayDictDecoder empty_decoder;
- auto dict_data = std::make_unique<uint8_t[]>(0);
+ auto dict_data = make_unique_buffer<uint8_t>(0);
ASSERT_TRUE(empty_decoder.set_dict(dict_data, 0, 0).ok());
}
diff --git a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
index 187f9ff30dd..1a2302d8558 100644
--- a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
+++ b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
@@ -21,6 +21,7 @@
#include "util/slice.h"
#include "vec/columns/column_vector.h"
+#include "vec/common/custom_allocator.h"
#include "vec/data_types/data_type_number.h"
namespace doris::vectorized {
@@ -33,7 +34,7 @@ protected:
size_t dict_size = 3;
size_t dict_data_size = dict_size * _type_length;
- auto dict_data = std::make_unique<uint8_t[]>(dict_data_size);
+ auto dict_data = make_unique_buffer<uint8_t>(dict_data_size);
const char* values[3] = {"apple ", "banana", "cherry"}; // Dictionary
values
for (int i = 0; i < 3; i++) {
memcpy(dict_data.get() + i * _type_length, values[i],
_type_length);
@@ -202,7 +203,7 @@ TEST_F(FixLengthDictDecoderTest, test_empty_dict) {
FixLengthDictDecoder empty_decoder;
empty_decoder.set_type_length(sizeof(int32_t));
- auto dict_data = std::make_unique<uint8_t[]>(0);
+ auto dict_data = make_unique_buffer<uint8_t>(0);
ASSERT_TRUE(empty_decoder.set_dict(dict_data, 0, 0).ok());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]