This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 270c7ddc90a [opt](be) Use a custom allocator to allocate memory in 
order to avoid untracked memory usage (#55749)
270c7ddc90a is described below

commit 270c7ddc90a3e6c29ef08676d59ed6b7c592dcb9
Author: Jerry Hu <[email protected]>
AuthorDate: Wed Sep 10 11:43:30 2025 +0800

    [opt](be) Use a custom allocator to allocate memory in order to avoid 
untracked memory usage (#55749)
---
 be/src/io/fs/broker_file_system.cpp                |  6 +-
 be/src/io/fs/buffered_reader.cpp                   |  4 +-
 be/src/io/fs/buffered_reader.h                     |  3 +-
 be/src/io/fs/stream_load_pipe.cpp                  |  9 ++-
 be/src/io/fs/stream_load_pipe.h                    | 16 ++--
 be/src/olap/tablet.cpp                             |  3 +-
 be/src/util/faststring.cc                          |  6 +-
 be/src/util/slice.h                                |  2 +-
 be/src/vec/common/allocator.cpp                    |  2 +-
 be/src/vec/common/allocator.h                      |  2 +-
 be/src/vec/common/custom_allocator.h               | 74 ++++++++++++++++++
 .../file_reader/new_plain_binary_line_reader.h     |  3 +-
 be/src/vec/exec/format/json/new_json_reader.cpp    | 14 ++--
 be/src/vec/exec/format/json/new_json_reader.h      |  7 +-
 be/src/vec/exec/format/orc/orc_file_reader.cpp     |  3 +-
 be/src/vec/exec/format/orc/orc_file_reader.h       |  3 +-
 .../format/parquet/byte_array_dict_decoder.cpp     |  2 +-
 .../exec/format/parquet/byte_array_dict_decoder.h  |  3 +-
 be/src/vec/exec/format/parquet/decoder.h           |  6 +-
 .../format/parquet/fix_length_dict_decoder.hpp     |  3 +-
 .../vec/exec/format/parquet/parquet_thrift_util.h  |  5 +-
 .../parquet/vparquet_column_chunk_reader.cpp       |  6 +-
 .../format/parquet/vparquet_column_chunk_reader.h  |  2 +-
 .../exec/format/parquet/vparquet_group_reader.cpp  |  9 ++-
 .../exec/format/parquet/vparquet_group_reader.h    |  3 +-
 be/src/vec/functions/function_jsonb.cpp            |  4 +-
 be/test/vec/common/custom_allocator_test.cpp       | 90 ++++++++++++++++++++++
 .../parquet/byte_array_dict_decoder_test.cpp       |  5 +-
 .../parquet/fix_length_dict_decoder_test.cpp       |  5 +-
 29 files changed, 242 insertions(+), 58 deletions(-)

diff --git a/be/src/io/fs/broker_file_system.cpp 
b/be/src/io/fs/broker_file_system.cpp
index 0d6d0d3555a..8b0d5db23e2 100644
--- a/be/src/io/fs/broker_file_system.cpp
+++ b/be/src/io/fs/broker_file_system.cpp
@@ -22,11 +22,10 @@
 #include <gen_cpp/TPaloBrokerService.h>
 #include <gen_cpp/Types_types.h>
 #include <glog/logging.h>
-#include <stddef.h>
 #include <thrift/Thrift.h>
 #include <thrift/transport/TTransportException.h>
 
-#include <algorithm>
+#include <cstddef>
 // IWYU pragma: no_include <bits/chrono.h>
 #include <chrono> // IWYU pragma: keep
 #include <filesystem>
@@ -45,6 +44,7 @@
 #include "runtime/broker_mgr.h"
 #include "runtime/exec_env.h"
 #include "util/slice.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris::io {
 
@@ -405,7 +405,7 @@ Status BrokerFileSystem::download_impl(const Path& 
remote_file, const Path& loca
     // 4. read remote and write to local
     VLOG(2) << "read remote file: " << remote_file << " to local: " << 
local_file;
     constexpr size_t buf_sz = 1024 * 1024;
-    std::unique_ptr<uint8_t[]> read_buf(new uint8_t[buf_sz]);
+    auto read_buf = make_unique_buffer<uint8_t>(buf_sz);
     size_t cur_offset = 0;
     while (true) {
         size_t read_len = 0;
diff --git a/be/src/io/fs/buffered_reader.cpp b/be/src/io/fs/buffered_reader.cpp
index 488e235af12..a22b08c0659 100644
--- a/be/src/io/fs/buffered_reader.cpp
+++ b/be/src/io/fs/buffered_reader.cpp
@@ -23,6 +23,7 @@
 
 #include <algorithm>
 #include <chrono>
+#include <cstdint>
 #include <memory>
 
 #include "common/cast_set.h"
@@ -35,6 +36,7 @@
 #include "util/runtime_profile.h"
 #include "util/slice.h"
 #include "util/threadpool.h"
+#include "vec/common/custom_allocator.h"
 namespace doris {
 
 #include "common/compile_check_begin.h"
@@ -800,7 +802,7 @@ Status BufferedFileStreamReader::read_bytes(const uint8_t** 
buf, uint64_t offset
     }
     size_t buf_size = std::max(_max_buf_size, bytes_to_read);
     if (_buf_size < buf_size) {
-        std::unique_ptr<uint8_t[]> new_buf(new uint8_t[buf_size]);
+        auto new_buf = make_unique_buffer<uint8_t>(buf_size);
         if (offset >= _buf_start_offset && offset < _buf_end_offset) {
             memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset,
                    _buf_end_offset - offset);
diff --git a/be/src/io/fs/buffered_reader.h b/be/src/io/fs/buffered_reader.h
index f298a3fe166..394c7eb9ad4 100644
--- a/be/src/io/fs/buffered_reader.h
+++ b/be/src/io/fs/buffered_reader.h
@@ -37,6 +37,7 @@
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
 #include "util/slice.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/typeid_cast.h"
 namespace doris {
 
@@ -655,7 +656,7 @@ protected:
     }
 
 private:
-    std::unique_ptr<uint8_t[]> _buf;
+    DorisUniqueBufferPtr<uint8_t> _buf;
     io::FileReaderSPtr _file;
     uint64_t _file_start_offset;
     uint64_t _file_end_offset;
diff --git a/be/src/io/fs/stream_load_pipe.cpp 
b/be/src/io/fs/stream_load_pipe.cpp
index 4b204a2aace..d25eb6a839c 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -28,6 +28,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/thread_context.h"
 #include "util/bit_util.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 namespace io {
@@ -90,7 +91,7 @@ Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice 
result, size_t* byt
 // If _total_length == -1, this should be a Kafka routine load task or stream 
load with chunked transfer HTTP request,
 // just get the next buffer directly from the buffer queue, because one buffer 
contains a complete piece of data.
 // Otherwise, this should be a stream load task that needs to read the 
specified amount of data.
-Status StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, 
size_t* length) {
+Status StreamLoadPipe::read_one_message(DorisUniqueBufferPtr<uint8_t>* data, 
size_t* length) {
     if (_total_length < -1) {
         return Status::InternalError("invalid, _total_length is: {}", 
_total_length);
     } else if (_total_length == 0) {
@@ -104,7 +105,7 @@ Status 
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
     }
 
     // _total_length > 0, read the entire data
-    data->reset(new uint8_t[_total_length]);
+    *data = make_unique_buffer<uint8_t>(_total_length);
     Slice result(data->get(), _total_length);
     Status st = read_at(0, result, length);
     return st;
@@ -163,7 +164,7 @@ Status StreamLoadPipe::append(const ByteBufferPtr& buf) {
 }
 
 // read the next buffer from _buf_queue
-Status StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, 
size_t* length) {
+Status StreamLoadPipe::_read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, 
size_t* length) {
     std::unique_lock<std::mutex> l(_lock);
     while (!_cancelled && !_finished && _buf_queue.empty()) {
         _get_cond.wait(l);
@@ -181,7 +182,7 @@ Status 
StreamLoadPipe::_read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_
     }
     auto buf = _buf_queue.front();
     *length = buf->remaining();
-    data->reset(new uint8_t[*length]);
+    *data = make_unique_buffer<uint8_t>(*length);
     buf->get_bytes((char*)(data->get()), *length);
     _buf_queue.pop_front();
     _buffered_bytes -= buf->limit;
diff --git a/be/src/io/fs/stream_load_pipe.h b/be/src/io/fs/stream_load_pipe.h
index 4731c620142..cedab0b6c17 100644
--- a/be/src/io/fs/stream_load_pipe.h
+++ b/be/src/io/fs/stream_load_pipe.h
@@ -18,10 +18,10 @@
 #pragma once
 
 #include <gen_cpp/internal_service.pb.h>
-#include <stddef.h>
-#include <stdint.h>
 
 #include <condition_variable>
+#include <cstddef>
+#include <cstdint>
 #include <deque>
 #include <memory>
 #include <mutex>
@@ -29,14 +29,13 @@
 
 #include "common/status.h"
 #include "io/fs/file_reader.h"
-#include "io/fs/file_system.h"
 #include "io/fs/path.h"
 #include "runtime/message_body_sink.h"
 #include "util/byte_buffer.h"
 #include "util/slice.h"
+#include "vec/common/custom_allocator.h"
 
-namespace doris {
-namespace io {
+namespace doris::io {
 struct IOContext;
 
 static inline constexpr size_t kMaxPipeBufferedBytes = 4 * 1024 * 1024;
@@ -74,7 +73,7 @@ public:
     // called when producer/consumer failed
     virtual void cancel(const std::string& reason) override;
 
-    Status read_one_message(std::unique_ptr<uint8_t[]>* data, size_t* length);
+    Status read_one_message(DorisUniqueBufferPtr<uint8_t>* data, size_t* 
length);
 
     size_t get_queue_size() { return _buf_queue.size(); }
 
@@ -97,7 +96,7 @@ protected:
 
 private:
     // read the next buffer from _buf_queue
-    Status _read_next_buffer(std::unique_ptr<uint8_t[]>* data, size_t* length);
+    Status _read_next_buffer(DorisUniqueBufferPtr<uint8_t>* data, size_t* 
length);
 
     Status _append(const ByteBufferPtr& buf, size_t proto_byte_size = 0);
 
@@ -130,5 +129,4 @@ private:
     // the data needs to be completely read before it can be parsed.
     bool _is_chunked_transfer = false;
 };
-} // namespace io
-} // namespace doris
+} // namespace doris::io
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index 758e1168274..6ef58fa9e99 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -121,6 +121,7 @@
 #include "util/work_thread_pool.hpp"
 #include "vec/columns/column.h"
 #include "vec/columns/column_string.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/schema_util.h"
 #include "vec/common/string_ref.h"
 #include "vec/data_types/data_type.h"
@@ -2184,7 +2185,7 @@ Status Tablet::_read_cooldown_meta(const StorageResource& 
storage_resource,
     RETURN_IF_ERROR(storage_resource.fs->open_file(remote_meta_path, 
&tablet_meta_reader));
     auto file_size = tablet_meta_reader->size();
     size_t bytes_read;
-    auto buf = std::unique_ptr<uint8_t[]>(new uint8_t[file_size]);
+    auto buf = make_unique_buffer<uint8_t>(file_size);
     RETURN_IF_ERROR(tablet_meta_reader->read_at(0, {buf.get(), file_size}, 
&bytes_read));
     RETURN_IF_ERROR(tablet_meta_reader->close());
     if (!tablet_meta_pb->ParseFromArray(buf.get(), cast_set<int>(file_size))) {
diff --git a/be/src/util/faststring.cc b/be/src/util/faststring.cc
index 046e08c29d8..f9ba31fb523 100644
--- a/be/src/util/faststring.cc
+++ b/be/src/util/faststring.cc
@@ -19,7 +19,7 @@
 
 #include <glog/logging.h>
 
-#include <memory>
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 
@@ -37,7 +37,7 @@ void faststring::GrowToAtLeast(size_t newcapacity) {
 
 void faststring::GrowArray(size_t newcapacity) {
     DCHECK_GE(newcapacity, capacity_);
-    std::unique_ptr<uint8_t[]> 
newdata(reinterpret_cast<uint8_t*>(Allocator::alloc(newcapacity)));
+    DorisUniqueBufferPtr<uint8_t> newdata(newcapacity);
     if (len_ > 0) {
         memcpy(&newdata[0], &data_[0], len_);
     }
@@ -62,7 +62,7 @@ void faststring::ShrinkToFitInternal() {
         data_ = initial_data_;
         capacity_ = kInitialCapacity;
     } else {
-        std::unique_ptr<uint8_t[]> 
newdata(reinterpret_cast<uint8_t*>(Allocator::alloc(len_)));
+        DorisUniqueBufferPtr<uint8_t> newdata(len_);
         memcpy(&newdata[0], &data_[0], len_);
         Allocator::free(data_, capacity_);
         data_ = newdata.release();
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index 3aa9a062153..c8b1629b8bd 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -311,7 +311,7 @@ inline int Slice::compare(const Slice& b) const {
 }
 
 // A move-only type which manage the lifecycle of externally allocated data.
-// Unlike std::unique_ptr<uint8_t[]>, OwnedSlice remembers the size of data so 
that clients can access
+// Unlike DorisUniqueBufferPtr<uint8_t>, OwnedSlice remembers the size of data 
so that clients can access
 // the underlying buffer as a Slice.
 //
 // Usage example:
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 50527cbe32e..21db641335b 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -373,7 +373,7 @@ void* Allocator<clear_memory_, mmap_populate, use_mmap, 
MemoryAllocator,
 template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename 
MemoryAllocator,
           bool check_and_tracking_memory>
 void Allocator<clear_memory_, mmap_populate, use_mmap, MemoryAllocator,
-               check_and_tracking_memory>::free(void* buf, size_t size) {
+               check_and_tracking_memory>::free(void* buf, size_t size) const {
     if (use_mmap && size >= doris::config::mmap_threshold) {
         if (0 != munmap(buf, size)) {
             throw_bad_alloc(fmt::format("Allocator: Cannot munmap {}.", size));
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2a288c65c82..e7ede2adfa9 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -232,7 +232,7 @@ public:
     void* realloc(void* buf, size_t old_size, size_t new_size, size_t 
alignment = 0);
 
     // Free memory range.
-    void free(void* buf, size_t size);
+    void free(void* buf, size_t size) const;
 
     void release_unused() { MemoryAllocator::release_unused(); }
 
diff --git a/be/src/vec/common/custom_allocator.h 
b/be/src/vec/common/custom_allocator.h
index 0e2cea3b9e7..b37af3e53c5 100644
--- a/be/src/vec/common/custom_allocator.h
+++ b/be/src/vec/common/custom_allocator.h
@@ -17,6 +17,9 @@
 
 #pragma once
 
+#include <cstddef>
+#include <type_traits>
+
 #include "vec/common/allocator.h"
 #include "vec/common/allocator_fwd.h"
 
@@ -33,6 +36,77 @@ template <class Key, class T, class Compare = std::less<Key>,
           class Allocator = CustomStdAllocator<std::pair<const Key, T>>>
 using DorisMap = std::map<Key, T, Compare, Allocator>;
 
+template <typename T>
+    requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>)
+struct DorisUniqueBufferDeleter : public Allocator<false> {
+    size_t count = 0;
+
+    DorisUniqueBufferDeleter() = default;
+    DorisUniqueBufferDeleter(size_t n) : count(n) {}
+
+    void operator()(T* ptr) const noexcept {
+        if (ptr) {
+            Allocator::free(ptr, count * sizeof(T));
+        }
+    }
+};
+
+template <typename T>
+    requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>)
+class DorisUniqueBufferPtr {
+public:
+    using Deleter = DorisUniqueBufferDeleter<T>;
+
+    DorisUniqueBufferPtr() = default;
+    explicit DorisUniqueBufferPtr(T* ptr) = delete;
+
+    DorisUniqueBufferPtr(size_t size) {
+        DorisUniqueBufferDeleter<T> deleter(size);
+        void* buf = deleter.alloc(size * sizeof(T));
+        if (!buf) {
+            return;
+        }
+        T* arr = static_cast<T*>(buf);
+        ptr_ = std::unique_ptr<T[], DorisUniqueBufferDeleter<T>>(arr, 
std::move(deleter));
+    }
+
+    DorisUniqueBufferPtr(std::unique_ptr<T[], Deleter>&& uptr) : 
ptr_(std::move(uptr)) {}
+    DorisUniqueBufferPtr(DorisUniqueBufferPtr&& other) : 
ptr_(std::move(other.ptr_)) {}
+
+    DorisUniqueBufferPtr& operator=(DorisUniqueBufferPtr&& other) {
+        if (this != &other) {
+            ptr_ = std::move(other.ptr_);
+        }
+        return *this;
+    }
+
+    DorisUniqueBufferPtr(std::nullptr_t) noexcept {}
+
+    // Delete this function to avoid passing in a pointer allocated by other 
means(new/malloc).
+    void reset(T*) = delete;
+
+    bool operator==(std::nullptr_t) const noexcept { return ptr_ == nullptr; }
+    bool operator==(T* other) const noexcept { return ptr_.get() == other; }
+
+    void reset() noexcept { ptr_.reset(); }
+
+    auto release() noexcept { return ptr_.release(); }
+
+    T* get() const noexcept { return ptr_.get(); }
+    T& operator*() const noexcept { return *ptr_; }
+    T* operator->() const noexcept { return ptr_.get(); }
+    T& operator[](size_t i) const { return ptr_[i]; }
+
+private:
+    std::unique_ptr<T[], Deleter> ptr_;
+};
+
+template <typename T>
+    requires(std::is_trivial_v<T> && std::is_standard_layout_v<T>)
+DorisUniqueBufferPtr<T> make_unique_buffer(size_t n) {
+    return DorisUniqueBufferPtr<T>(n);
+}
+
 // NOTE: Even CustomStdAllocator 's allocate/dallocate could modify memory 
tracker,but it's still stateless,
 // because threadcontext owns the memtracker, not CustomStdAllocator.
 template <class T, typename MemoryAllocator>
diff --git a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h 
b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
index e81afb162c3..02a948dc839 100644
--- a/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
+++ b/be/src/vec/exec/format/file_reader/new_plain_binary_line_reader.h
@@ -24,6 +24,7 @@
 #include "common/status.h"
 #include "exec/line_reader.h"
 #include "io/fs/file_reader_writer_fwd.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 #include "common/compile_check_begin.h"
@@ -51,7 +52,7 @@ public:
 private:
     io::FileReaderSPtr _file_reader;
 
-    std::unique_ptr<uint8_t[]> _file_buf;
+    DorisUniqueBufferPtr<uint8_t> _file_buf;
     std::unique_ptr<PDataRow> _cur_row;
 };
 
diff --git a/be/src/vec/exec/format/json/new_json_reader.cpp 
b/be/src/vec/exec/format/json/new_json_reader.cpp
index e49100f82a1..e75bff2c96f 100644
--- a/be/src/vec/exec/format/json/new_json_reader.cpp
+++ b/be/src/vec/exec/format/json/new_json_reader.cpp
@@ -58,6 +58,7 @@
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_struct.h"
 #include "vec/common/assert_cast.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/core/column_with_type_and_name.h"
 #include "vec/data_types/data_type_array.h"
 #include "vec/data_types/data_type_factory.hpp"
@@ -244,7 +245,7 @@ Status 
NewJsonReader::get_parsed_schema(std::vector<std::string>* col_names,
                                         std::vector<DataTypePtr>* col_types) {
     bool eof = false;
     const uint8_t* json_str = nullptr;
-    std::unique_ptr<uint8_t[]> json_str_ptr;
+    DorisUniqueBufferPtr<uint8_t> json_str_ptr;
     size_t size = 0;
     if (_line_reader != nullptr) {
         RETURN_IF_ERROR(_line_reader->read_line(&json_str, &size, &eof, 
_io_ctx));
@@ -474,7 +475,8 @@ Status NewJsonReader::_read_json_column(RuntimeState* 
state, Block& block,
     return (this->*_vhandle_json_callback)(state, block, slot_descs, 
is_empty_row, eof);
 }
 
-Status NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, 
size_t* read_size) {
+Status NewJsonReader::_read_one_message(DorisUniqueBufferPtr<uint8_t>* 
file_buf,
+                                        size_t* read_size) {
     switch (_params.file_type) {
     case TFileType::FILE_LOCAL:
         [[fallthrough]];
@@ -482,7 +484,7 @@ Status 
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
         [[fallthrough]];
     case TFileType::FILE_S3: {
         size_t file_size = _file_reader->size();
-        file_buf->reset(new uint8_t[file_size]);
+        *file_buf = make_unique_buffer<uint8_t>(file_size);
         Slice result(file_buf->get(), file_size);
         RETURN_IF_ERROR(_file_reader->read_at(_current_offset, result, 
read_size, _io_ctx));
         _current_offset += *read_size;
@@ -499,7 +501,7 @@ Status 
NewJsonReader::_read_one_message(std::unique_ptr<uint8_t[]>* file_buf, si
     return Status::OK();
 }
 
-Status NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* 
file_buf,
+Status 
NewJsonReader::_read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* 
file_buf,
                                                   size_t* read_size) {
     auto* stream_load_pipe = 
dynamic_cast<io::StreamLoadPipe*>(_file_reader.get());
 
@@ -515,7 +517,7 @@ Status 
NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi
     uint64_t cur_size = 0;
 
     // second read: continuously read data from the pipe until all data is 
read.
-    std::unique_ptr<uint8_t[]> read_buf;
+    DorisUniqueBufferPtr<uint8_t> read_buf;
     size_t read_buf_size = 0;
     while (true) {
         RETURN_IF_ERROR(stream_load_pipe->read_one_message(&read_buf, 
&read_buf_size));
@@ -534,7 +536,7 @@ Status 
NewJsonReader::_read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* fi
         return Status::OK();
     }
 
-    std::unique_ptr<uint8_t[]> total_buf = 
std::make_unique<uint8_t[]>(cur_size + *read_size);
+    DorisUniqueBufferPtr<uint8_t> total_buf = 
make_unique_buffer<uint8_t>(cur_size + *read_size);
 
     // copy the data during the first read
     memcpy(total_buf.get(), file_buf->get(), *read_size);
diff --git a/be/src/vec/exec/format/json/new_json_reader.h 
b/be/src/vec/exec/format/json/new_json_reader.h
index f537f899eb3..283b1c6251e 100644
--- a/be/src/vec/exec/format/json/new_json_reader.h
+++ b/be/src/vec/exec/format/json/new_json_reader.h
@@ -37,6 +37,7 @@
 #include "io/file_factory.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "util/runtime_profile.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/string_ref.h"
 #include "vec/core/types.h"
 #include "vec/exec/format/generic_reader.h"
@@ -102,11 +103,11 @@ private:
                              const std::vector<SlotDescriptor*>& slot_descs, 
bool* is_empty_row,
                              bool* eof);
 
-    Status _read_one_message(std::unique_ptr<uint8_t[]>* file_buf, size_t* 
read_size);
+    Status _read_one_message(DorisUniqueBufferPtr<uint8_t>* file_buf, size_t* 
read_size);
 
     // StreamLoadPipe::read_one_message only reads a portion of the data when 
stream loading with a chunked transfer HTTP request.
     // Need to read all the data before performing JSON parsing.
-    Status _read_one_message_from_pipe(std::unique_ptr<uint8_t[]>* file_buf, 
size_t* read_size);
+    Status _read_one_message_from_pipe(DorisUniqueBufferPtr<uint8_t>* 
file_buf, size_t* read_size);
 
     // simdjson, replace none simdjson function if it is ready
     Status _simdjson_init_reader();
@@ -239,7 +240,7 @@ private:
     /// Set of columns which already met in row. Exception is thrown if there 
are more than one column with the same name.
     std::vector<UInt8> _seen_columns;
     // simdjson
-    std::unique_ptr<uint8_t[]> _json_str_ptr;
+    DorisUniqueBufferPtr<uint8_t> _json_str_ptr;
     const uint8_t* _json_str = nullptr;
     static constexpr size_t _init_buffer_size = 1024 * 1024 * 8;
     size_t _padded_size = _init_buffer_size + simdjson::SIMDJSON_PADDING;
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.cpp 
b/be/src/vec/exec/format/orc/orc_file_reader.cpp
index 6f1411563e7..72c9cce6215 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.cpp
+++ b/be/src/vec/exec/format/orc/orc_file_reader.cpp
@@ -18,6 +18,7 @@
 #include "vec/exec/format/orc/orc_file_reader.h"
 
 #include "util/runtime_profile.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 namespace vectorized {
@@ -60,7 +61,7 @@ Status OrcMergeRangeFileReader::read_at_impl(size_t offset, 
Slice result, size_t
 
     if (_cache == nullptr) {
         auto range_size = _range.end_offset - _range.start_offset;
-        _cache = std::make_unique<char[]>(range_size);
+        _cache = make_unique_buffer<char>(range_size);
 
         {
             SCOPED_RAW_TIMER(&_statistics.read_time);
diff --git a/be/src/vec/exec/format/orc/orc_file_reader.h 
b/be/src/vec/exec/format/orc/orc_file_reader.h
index d9d90f3e6e4..503777e67c2 100644
--- a/be/src/vec/exec/format/orc/orc_file_reader.h
+++ b/be/src/vec/exec/format/orc/orc_file_reader.h
@@ -19,6 +19,7 @@
 
 #include "io/fs/buffered_reader.h"
 #include "io/fs/file_reader.h"
+#include "vec/common/custom_allocator.h"
 
 namespace doris {
 namespace vectorized {
@@ -75,7 +76,7 @@ private:
     io::FileReaderSPtr _inner_reader;
     io::PrefetchRange _range;
 
-    std::unique_ptr<char[]> _cache;
+    DorisUniqueBufferPtr<char> _cache;
     int64_t _current_start_offset = -1;
 
     size_t _size;
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp 
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
index 1693d5fab68..49ab5cd584b 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.cpp
@@ -29,7 +29,7 @@
 
 namespace doris::vectorized {
 #include "common/compile_check_begin.h"
-Status ByteArrayDictDecoder::set_dict(std::unique_ptr<uint8_t[]>& dict, 
int32_t length,
+Status ByteArrayDictDecoder::set_dict(DorisUniqueBufferPtr<uint8_t>& dict, 
int32_t length,
                                       size_t num_values) {
     _dict = std::move(dict);
     if (_dict == nullptr) {
diff --git a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h 
b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
index 3a87801ac4b..762a9c5b885 100644
--- a/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
+++ b/be/src/vec/exec/format/parquet/byte_array_dict_decoder.h
@@ -49,7 +49,8 @@ public:
     Status _decode_values(MutableColumnPtr& doris_column, DataTypePtr& 
data_type,
                           ColumnSelectVector& select_vector, bool 
is_dict_filter);
 
-    Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t 
num_values) override;
+    Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length,
+                    size_t num_values) override;
 
     Status read_dict_values_to_column(MutableColumnPtr& doris_column) override;
 
diff --git a/be/src/vec/exec/format/parquet/decoder.h 
b/be/src/vec/exec/format/parquet/decoder.h
index 5b0098324c2..81f328ded43 100644
--- a/be/src/vec/exec/format/parquet/decoder.h
+++ b/be/src/vec/exec/format/parquet/decoder.h
@@ -33,6 +33,7 @@
 #include "vec/columns/column_dictionary.h"
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/pod_array_fwd.h"
 #include "vec/core/types.h"
 #include "vec/data_types/data_type.h"
@@ -70,7 +71,8 @@ public:
 
     virtual Status skip_values(size_t num_values) = 0;
 
-    virtual Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, 
size_t num_values) {
+    virtual Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t 
length,
+                            size_t num_values) {
         return Status::NotSupported("set_dict is not supported");
     }
 
@@ -151,7 +153,7 @@ protected:
     }
 
     // For dictionary encoding
-    std::unique_ptr<uint8_t[]> _dict;
+    DorisUniqueBufferPtr<uint8_t> _dict;
     std::unique_ptr<RleBatchDecoder<uint32_t>> _index_batch_decoder;
     std::vector<uint32_t> _indexes;
 };
diff --git a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp 
b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
index 3b9966e9f36..b997c10da51 100644
--- a/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
+++ b/be/src/vec/exec/format/parquet/fix_length_dict_decoder.hpp
@@ -103,7 +103,8 @@ protected:
         return Status::OK();
     }
 
-    Status set_dict(std::unique_ptr<uint8_t[]>& dict, int32_t length, size_t 
num_values) override {
+    Status set_dict(DorisUniqueBufferPtr<uint8_t>& dict, int32_t length,
+                    size_t num_values) override {
         if (num_values * _type_length != length) {
             return Status::Corruption("Wrong dictionary data for fixed length 
type");
         }
diff --git a/be/src/vec/exec/format/parquet/parquet_thrift_util.h 
b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
index 346bbe60c02..658a32416b7 100644
--- a/be/src/vec/exec/format/parquet/parquet_thrift_util.h
+++ b/be/src/vec/exec/format/parquet/parquet_thrift_util.h
@@ -28,6 +28,7 @@
 #include "olap/iterators.h"
 #include "util/coding.h"
 #include "util/thrift_util.h"
+#include "vec/common/custom_allocator.h"
 #include "vparquet_file_metadata.h"
 
 namespace doris::vectorized {
@@ -62,10 +63,10 @@ static Status parse_thrift_footer(io::FileReaderSPtr file,
         return Status::Corruption("Parquet footer size({}) is large than file 
size({})",
                                   metadata_size, file_size);
     }
-    std::unique_ptr<uint8_t[]> new_buff;
+    DorisUniqueBufferPtr<uint8_t> new_buff;
     uint8_t* meta_ptr;
     if (metadata_size > bytes_read - PARQUET_FOOTER_SIZE) {
-        new_buff.reset(new uint8_t[metadata_size]);
+        new_buff = make_unique_buffer<uint8_t>(metadata_size);
         RETURN_IF_ERROR(file->read_at(file_size - PARQUET_FOOTER_SIZE - 
metadata_size,
                                       Slice(new_buff.get(), metadata_size), 
&bytes_read, io_ctx));
         meta_ptr = new_buff.get();
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
index 825e0092f60..5bcf4abf919 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.cpp
@@ -21,6 +21,7 @@
 #include <glog/logging.h>
 #include <string.h>
 
+#include <cstdint>
 #include <utility>
 
 #include "common/compiler_util.h" // IWYU pragma: keep
@@ -28,6 +29,7 @@
 #include "util/block_compression.h"
 #include "util/runtime_profile.h"
 #include "vec/columns/column.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/exec/format/parquet/decoder.h"
 #include "vec/exec/format/parquet/level_decoder.h"
 #include "vec/exec/format/parquet/schema_desc.h"
@@ -232,7 +234,7 @@ Status ColumnChunkReader::_decode_dict_page() {
 
     // Prepare dictionary data
     int32_t uncompressed_size = header->uncompressed_page_size;
-    std::unique_ptr<uint8_t[]> dict_data(new uint8_t[uncompressed_size]);
+    auto dict_data = make_unique_buffer<uint8_t>(uncompressed_size);
     if (_block_compress_codec != nullptr) {
         Slice compressed_data;
         RETURN_IF_ERROR(_page_reader->get_page_data(compressed_data));
@@ -265,7 +267,7 @@ Status ColumnChunkReader::_decode_dict_page() {
 void ColumnChunkReader::_reserve_decompress_buf(size_t size) {
     if (size > _decompress_buf_size) {
         _decompress_buf_size = BitUtil::next_power_of_two(size);
-        _decompress_buf.reset(new uint8_t[_decompress_buf_size]);
+        _decompress_buf = make_unique_buffer<uint8_t>(_decompress_buf_size);
     }
 }
 
diff --git a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
index e52973b732e..db0530da597 100644
--- a/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_column_chunk_reader.h
@@ -215,7 +215,7 @@ private:
     size_t _chunk_parsed_values = 0;
     uint32_t _remaining_num_values = 0;
     Slice _page_data;
-    std::unique_ptr<uint8_t[]> _decompress_buf;
+    DorisUniqueBufferPtr<uint8_t> _decompress_buf;
     size_t _decompress_buf_size = 0;
     Slice _v2_rep_levels;
     Slice _v2_def_levels;
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
index 7d14a87ea1d..96feeceea8d 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.cpp
@@ -44,6 +44,7 @@
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_vector.h"
 #include "vec/common/assert_cast.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/common/pod_array.h"
 #include "vec/core/block.h"
 #include "vec/core/column_with_type_and_name.h"
@@ -565,7 +566,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
     }
 
     FilterMap& filter_map = *filter_map_ptr;
-    std::unique_ptr<uint8_t[]> rebuild_filter_map = nullptr;
+    DorisUniqueBufferPtr<uint8_t> rebuild_filter_map = nullptr;
     if (_cached_filtered_rows != 0) {
         RETURN_IF_ERROR(_rebuild_filter_map(filter_map, rebuild_filter_map, 
pre_read_rows));
         pre_read_rows += _cached_filtered_rows;
@@ -625,7 +626,7 @@ Status RowGroupReader::_do_lazy_read(Block* block, size_t 
batch_size, size_t* re
 }
 
 Status RowGroupReader::_rebuild_filter_map(FilterMap& filter_map,
-                                           std::unique_ptr<uint8_t[]>& 
filter_map_data,
+                                           DorisUniqueBufferPtr<uint8_t>& 
filter_map_data,
                                            size_t pre_read_rows) const {
     if (_cached_filtered_rows == 0) {
         return Status::OK();
@@ -636,8 +637,8 @@ Status RowGroupReader::_rebuild_filter_map(FilterMap& 
filter_map,
         return Status::OK();
     }
 
-    uint8_t* map = new uint8_t[total_rows];
-    filter_map_data.reset(map);
+    filter_map_data = make_unique_buffer<uint8_t>(total_rows);
+    auto* map = filter_map_data.get();
     for (size_t i = 0; i < _cached_filtered_rows; ++i) {
         map[i] = 0;
     }
diff --git a/be/src/vec/exec/format/parquet/vparquet_group_reader.h 
b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
index 002163bec9a..9d0a59a4e91 100644
--- a/be/src/vec/exec/format/parquet/vparquet_group_reader.h
+++ b/be/src/vec/exec/format/parquet/vparquet_group_reader.h
@@ -192,7 +192,8 @@ private:
                              size_t batch_size, size_t* read_rows, bool* 
batch_eof,
                              FilterMap& filter_map);
     Status _do_lazy_read(Block* block, size_t batch_size, size_t* read_rows, 
bool* batch_eof);
-    Status _rebuild_filter_map(FilterMap& filter_map, 
std::unique_ptr<uint8_t[]>& filter_map_data,
+    Status _rebuild_filter_map(FilterMap& filter_map,
+                               DorisUniqueBufferPtr<uint8_t>& filter_map_data,
                                size_t pre_read_rows) const;
     Status _fill_partition_columns(
             Block* block, size_t rows,
diff --git a/be/src/vec/functions/function_jsonb.cpp 
b/be/src/vec/functions/function_jsonb.cpp
index 89d5274db92..dfc0101c475 100644
--- a/be/src/vec/functions/function_jsonb.cpp
+++ b/be/src/vec/functions/function_jsonb.cpp
@@ -2007,7 +2007,7 @@ public:
 
         JsonbWriter writer;
         struct DocumentBuffer {
-            std::unique_ptr<char[]> ptr;
+            DorisUniqueBufferPtr<char> ptr;
             size_t size = 0;
             size_t capacity = 0;
         };
@@ -2099,7 +2099,7 @@ public:
                 if (writer_output->getSize() > tmp_buffer.capacity) {
                     tmp_buffer.capacity =
                             ((size_t(writer_output->getSize()) + 1024 - 1) / 
1024) * 1024;
-                    tmp_buffer.ptr = 
std::make_unique<char[]>(tmp_buffer.capacity);
+                    tmp_buffer.ptr = 
make_unique_buffer<char>(tmp_buffer.capacity);
                     DCHECK_LE(writer_output->getSize(), tmp_buffer.capacity);
                 }
 
diff --git a/be/test/vec/common/custom_allocator_test.cpp 
b/be/test/vec/common/custom_allocator_test.cpp
new file mode 100644
index 00000000000..23adf86c667
--- /dev/null
+++ b/be/test/vec/common/custom_allocator_test.cpp
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/ClickHouse/ClickHouse/blob/master/src/Common/tests/gtest_pod_array.cpp
+// and modified by Doris
+
+#include "vec/common/custom_allocator.h"
+
+#include <gtest/gtest.h>
+
+#include <memory>
+
+namespace doris {
+TEST(CustomAllocatorTest, StdAllocatorBasic) {
+    using Alloc = CustomStdAllocator<int32_t>;
+    Alloc alloc;
+
+    int32_t* buf = alloc.allocate(10);
+    for (int i = 0; i < 10; ++i) {
+        buf[i] = i;
+    }
+    for (int i = 0; i < 10; ++i) {
+        ASSERT_EQ(buf[i], i);
+    }
+    alloc.deallocate(buf, 10);
+}
+
+TEST(DorisUniqueBufferPtrTest, Basic) {
+    auto buf = make_unique_buffer<int32_t>(10);
+    for (int32_t i = 0; i != 10; ++i) {
+        buf[i] = i;
+    }
+    for (int32_t i = 0; i != 10; ++i) {
+        ASSERT_EQ(buf[i], i);
+    }
+
+    DorisUniqueBufferPtr<int32_t> buf2 = std::move(buf);
+    for (int32_t i = 0; i != 10; ++i) {
+        ASSERT_EQ(buf2[i], i);
+    }
+
+    ASSERT_EQ(buf, nullptr);
+
+    auto* ptr = buf2.release();
+    for (int32_t i = 0; i != 10; ++i) {
+        ASSERT_EQ(ptr[i], i);
+    }
+
+    ASSERT_EQ(buf2, nullptr);
+    Allocator<false> {}.free(ptr, 10 * sizeof(int32_t));
+
+    DorisUniqueBufferPtr<int32_t> buf3(100);
+    ASSERT_NE(buf3, nullptr);
+
+    buf3.reset();
+    ASSERT_EQ(buf3, nullptr);
+
+    DorisUniqueBufferPtr<int32_t>::Deleter deleter(50);
+    auto unique_ptr = std::unique_ptr<int32_t[], 
DorisUniqueBufferPtr<int32_t>::Deleter>(
+            static_cast<int32_t*>(deleter.alloc(50 * sizeof(int32_t))), 
std::move(deleter));
+
+    DorisUniqueBufferPtr<int32_t> buf4(std::move(unique_ptr));
+    ASSERT_NE(buf4, nullptr);
+
+    ptr = buf4.get();
+
+    DorisUniqueBufferPtr<int32_t> buf5(std::move(buf4));
+
+    ASSERT_EQ(ptr, buf5.get());
+    ASSERT_EQ(ptr, buf5);
+
+    DorisUniqueBufferPtr<int32_t> buf6 = nullptr;
+    ASSERT_EQ(buf6, nullptr);
+}
+
+}; // namespace doris
\ No newline at end of file
diff --git a/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp 
b/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
index a6450e253c0..5115fdf8b74 100644
--- a/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
+++ b/be/test/vec/exec/format/parquet/byte_array_dict_decoder_test.cpp
@@ -26,6 +26,7 @@
 #include "vec/columns/column_dictionary.h"
 #include "vec/columns/column_string.h"
 #include "vec/columns/column_vector.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/data_types/data_type_string.h"
 
 namespace doris::vectorized {
@@ -43,7 +44,7 @@ protected:
             dict_data_size += 4 + strlen(values[i]); // 4 bytes for length + 
string data
         }
 
-        auto dict_data = std::make_unique<uint8_t[]>(dict_data_size);
+        auto dict_data = make_unique_buffer<uint8_t>(dict_data_size);
         size_t offset = 0;
         for (int i = 0; i < 3; i++) {
             uint32_t len = strlen(values[i]);
@@ -180,7 +181,7 @@ TEST_F(ByteArrayDictDecoderTest, 
test_decode_with_filter_and_null) {
 // Test empty dictionary case
 TEST_F(ByteArrayDictDecoderTest, test_empty_dict) {
     ByteArrayDictDecoder empty_decoder;
-    auto dict_data = std::make_unique<uint8_t[]>(0);
+    auto dict_data = make_unique_buffer<uint8_t>(0);
     ASSERT_TRUE(empty_decoder.set_dict(dict_data, 0, 0).ok());
 }
 
diff --git a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp 
b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
index 187f9ff30dd..1a2302d8558 100644
--- a/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
+++ b/be/test/vec/exec/format/parquet/fix_length_dict_decoder_test.cpp
@@ -21,6 +21,7 @@
 
 #include "util/slice.h"
 #include "vec/columns/column_vector.h"
+#include "vec/common/custom_allocator.h"
 #include "vec/data_types/data_type_number.h"
 
 namespace doris::vectorized {
@@ -33,7 +34,7 @@ protected:
         size_t dict_size = 3;
         size_t dict_data_size = dict_size * _type_length;
 
-        auto dict_data = std::make_unique<uint8_t[]>(dict_data_size);
+        auto dict_data = make_unique_buffer<uint8_t>(dict_data_size);
         const char* values[3] = {"apple ", "banana", "cherry"}; // Dictionary 
values
         for (int i = 0; i < 3; i++) {
             memcpy(dict_data.get() + i * _type_length, values[i], 
_type_length);
@@ -202,7 +203,7 @@ TEST_F(FixLengthDictDecoderTest, test_empty_dict) {
     FixLengthDictDecoder empty_decoder;
     empty_decoder.set_type_length(sizeof(int32_t));
 
-    auto dict_data = std::make_unique<uint8_t[]>(0);
+    auto dict_data = make_unique_buffer<uint8_t>(0);
     ASSERT_TRUE(empty_decoder.set_dict(dict_data, 0, 0).ok());
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to