This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c5a19982a6b [Feature](multi-catalog) Add memory tracker for orc
reader/writer and arrow parquet writer。 (#37234)
c5a19982a6b is described below
commit c5a19982a6bc5c7cffc98c1694d106e9925ecc66
Author: Qi Chen <[email protected]>
AuthorDate: Tue Jul 30 09:00:44 2024 +0800
[Feature](multi-catalog) Add memory tracker for orc reader/writer and arrow
parquet writer。 (#37234)
## Proposed changes
[Feature] (multi-catalog) Add memory tracker for orc reader/writer and
arrow parquet writer。
## Future work
- Since the parquet reader is written by ourself and does not use the
arrow third-party library, some memory usage needs to be added to the
memory track.
- Added read and write operator-level memory tracker to the profile.
---
be/src/runtime/exec_env.h | 13 ++
be/src/runtime/exec_env_init.cpp | 9 +
be/src/util/faststring.h | 2 +-
be/src/util/slice.h | 2 +-
be/src/vec/common/allocator.cpp | 90 ++++++----
be/src/vec/common/allocator.h | 165 +++++++++++++++--
be/src/vec/common/allocator_fwd.h | 4 +-
be/src/vec/common/hash_table/phmap_fwd_decl.h | 2 +-
be/src/vec/exec/format/orc/orc_memory_pool.h | 53 ++++++
be/src/vec/exec/format/orc/vorc_reader.cpp | 2 +
.../vec/exec/format/parquet/arrow_memory_pool.cpp | 74 ++++++++
be/src/vec/exec/format/parquet/arrow_memory_pool.h | 199 +++++++++++++++++++++
be/src/vec/runtime/vorc_transformer.cpp | 22 +--
be/src/vec/runtime/vparquet_transformer.cpp | 15 +-
14 files changed, 591 insertions(+), 61 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 3434d01a59e..89e5593c84b 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -36,6 +36,13 @@
#include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove
this include header
#include "util/threadpool.h"
+namespace orc {
+class MemoryPool;
+}
+namespace arrow {
+class MemoryPool;
+}
+
namespace doris {
namespace vectorized {
class VDataStreamMgr;
@@ -305,6 +312,9 @@ public:
segment_v2::TmpFileDirs* get_tmp_file_dirs() { return
_tmp_file_dirs.get(); }
io::FDCache* file_cache_open_fd_cache() const { return
_file_cache_open_fd_cache.get(); }
+ orc::MemoryPool* orc_memory_pool() { return _orc_memory_pool; }
+ arrow::MemoryPool* arrow_memory_pool() { return _arrow_memory_pool; }
+
private:
ExecEnv();
@@ -435,6 +445,9 @@ private:
std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx;
std::unique_ptr<segment_v2::TmpFileDirs> _tmp_file_dirs;
doris::vectorized::SpillStreamManager* _spill_stream_mgr = nullptr;
+
+ orc::MemoryPool* _orc_memory_pool = nullptr;
+ arrow::MemoryPool* _arrow_memory_pool = nullptr;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 32fbc4e0af4..6740f548761 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -100,6 +100,8 @@
#include "util/threadpool.h"
#include "util/thrift_rpc_helper.h"
#include "util/timezone_utils.h"
+#include "vec/exec/format/orc/orc_memory_pool.h"
+#include "vec/exec/format/parquet/arrow_memory_pool.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/runtime/vdata_stream_mgr.h"
#include "vec/sink/delta_writer_v2_pool.h"
@@ -573,6 +575,10 @@ Status ExecEnv::_init_mem_env() {
<< PrettyPrinter::print(inverted_index_cache_limit, TUnit::BYTES)
<< ", origin config value: " <<
config::inverted_index_query_cache_limit;
+ // init orc memory pool
+ _orc_memory_pool = new doris::vectorized::ORCMemoryPool();
+ _arrow_memory_pool = new doris::vectorized::ArrowMemoryPool();
+
return Status::OK();
}
@@ -751,6 +757,9 @@ void ExecEnv::destroy() {
// We should free task scheduler finally because task queue / scheduler
maybe used by pipelineX.
SAFE_DELETE(_without_group_task_scheduler);
+ SAFE_DELETE(_arrow_memory_pool);
+ SAFE_DELETE(_orc_memory_pool);
+
// dns cache is a global instance and need to be released at last
SAFE_DELETE(_dns_cache);
diff --git a/be/src/util/faststring.h b/be/src/util/faststring.h
index 8d9fa6d004f..3ec0acbda01 100644
--- a/be/src/util/faststring.h
+++ b/be/src/util/faststring.h
@@ -35,7 +35,7 @@ namespace doris {
// common use cases (in particular, resize() will fill with uninitialized data
// instead of memsetting to \0)
// only build() can transfer data to the outside.
-class faststring : private Allocator<false, false, false> {
+class faststring : private Allocator<false, false, false,
DefaultMemoryAllocator> {
public:
enum { kInitialCapacity = 32 };
diff --git a/be/src/util/slice.h b/be/src/util/slice.h
index 80f9616f3da..bae33d4ee75 100644
--- a/be/src/util/slice.h
+++ b/be/src/util/slice.h
@@ -340,7 +340,7 @@ struct SliceMap {
//
// only receive the memory allocated by Allocator and disables mmap,
// otherwise the memory may not be freed correctly, currently only be
constructed by faststring.
-class OwnedSlice : private Allocator<false, false, false> {
+class OwnedSlice : private Allocator<false, false, false,
DefaultMemoryAllocator> {
public:
OwnedSlice() : _slice((uint8_t*)nullptr, 0) {}
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index 2b1c05533cd..879e98d0ca4 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -40,8 +40,12 @@
#include "util/stack_util.h"
#include "util/uid_util.h"
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t size) const {
+std::unordered_map<void*, size_t> RecordSizeMemoryAllocator::_allocated_sizes;
+std::mutex RecordSizeMemoryAllocator::_mutex;
+
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::sys_memory_check(
+ size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
@@ -155,8 +159,9 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::sys_memory_check(size_t
}
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate,
use_mmap>::memory_tracker_check(size_t size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::memory_tracker_check(
+ size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
@@ -191,24 +196,27 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::memory_tracker_check(siz
}
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::memory_check(size_t
size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::memory_check(
+ size_t size) const {
sys_memory_check(size);
memory_tracker_check(size);
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::consume_memory(size_t
size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::consume_memory(
+ size_t size) const {
CONSUME_THREAD_MEM_TRACKER(size);
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::release_memory(size_t
size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::release_memory(
+ size_t size) const {
RELEASE_THREAD_MEM_TRACKER(size);
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate, use_mmap>::throw_bad_alloc(
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::throw_bad_alloc(
const std::string& err) const {
LOG(WARNING) << err
<< fmt::format("{}, Stacktrace: {}",
@@ -219,9 +227,9 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::throw_bad_alloc(
}
#ifndef NDEBUG
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate,
use_mmap>::add_address_sanitizers(void* buf,
-
size_t size) const {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::add_address_sanitizers(
+ void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
return;
@@ -230,8 +238,8 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::add_address_sanitizers(v
doris::thread_context()->thread_mem_tracker()->add_address_sanitizers(buf,
size);
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void Allocator<clear_memory_, mmap_populate,
use_mmap>::remove_address_sanitizers(
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::remove_address_sanitizers(
void* buf, size_t size) const {
#ifdef BE_TEST
if (!doris::ExecEnv::ready()) {
@@ -242,23 +250,43 @@ void Allocator<clear_memory_, mmap_populate,
use_mmap>::remove_address_sanitizer
}
#endif
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void* Allocator<clear_memory_, mmap_populate, use_mmap>::alloc(size_t size,
size_t alignment) {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void* Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::alloc(size_t size,
+
size_t alignment) {
return alloc_impl(size, alignment);
}
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
-void* Allocator<clear_memory_, mmap_populate, use_mmap>::realloc(void* buf,
size_t old_size,
- size_t
new_size,
- size_t
alignment) {
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
+void* Allocator<clear_memory_, mmap_populate, use_mmap,
MemoryAllocator>::realloc(
+ void* buf, size_t old_size, size_t new_size, size_t alignment) {
return realloc_impl(buf, old_size, new_size, alignment);
}
-template class Allocator<true, true, true>;
-template class Allocator<true, true, false>;
-template class Allocator<true, false, true>;
-template class Allocator<true, false, false>;
-template class Allocator<false, true, true>;
-template class Allocator<false, true, false>;
-template class Allocator<false, false, true>;
-template class Allocator<false, false, false>;
+template class Allocator<true, true, true, DefaultMemoryAllocator>;
+template class Allocator<true, true, false, DefaultMemoryAllocator>;
+template class Allocator<true, false, true, DefaultMemoryAllocator>;
+template class Allocator<true, false, false, DefaultMemoryAllocator>;
+template class Allocator<false, true, true, DefaultMemoryAllocator>;
+template class Allocator<false, true, false, DefaultMemoryAllocator>;
+template class Allocator<false, false, true, DefaultMemoryAllocator>;
+template class Allocator<false, false, false, DefaultMemoryAllocator>;
+
+/** It would be better to put these Memory Allocators where they are used,
such as in the orc memory pool and arrow memory pool.
+ * But currently allocators use templates in .cpp instead of all in .h, so
they can only be placed here.
+ */
+template class Allocator<true, true, false, ORCMemoryAllocator>;
+template class Allocator<true, false, true, ORCMemoryAllocator>;
+template class Allocator<true, false, false, ORCMemoryAllocator>;
+template class Allocator<false, true, true, ORCMemoryAllocator>;
+template class Allocator<false, true, false, ORCMemoryAllocator>;
+template class Allocator<false, false, true, ORCMemoryAllocator>;
+template class Allocator<false, false, false, ORCMemoryAllocator>;
+
+template class Allocator<true, true, true, RecordSizeMemoryAllocator>;
+template class Allocator<true, true, false, RecordSizeMemoryAllocator>;
+template class Allocator<true, false, true, RecordSizeMemoryAllocator>;
+template class Allocator<true, false, false, RecordSizeMemoryAllocator>;
+template class Allocator<false, true, true, RecordSizeMemoryAllocator>;
+template class Allocator<false, true, false, RecordSizeMemoryAllocator>;
+template class Allocator<false, false, true, RecordSizeMemoryAllocator>;
+template class Allocator<false, false, false, RecordSizeMemoryAllocator>;
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 3c513f270bb..88c85dceeb3 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -23,6 +23,10 @@
// TODO: Readable
#include <fmt/format.h>
+#if defined(USE_JEMALLOC)
+#include <jemalloc/jemalloc.h>
+#endif // defined(USE_JEMALLOC)
+#include <malloc.h>
#include <stdint.h>
#include <string.h>
@@ -68,6 +72,128 @@ static constexpr size_t MALLOC_MIN_ALIGNMENT = 8;
// is always a multiple of sixteen.
(https://www.gnu.org/software/libc/manual/html_node/Aligned-Memory-Blocks.html)
static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
+class DefaultMemoryAllocator {
+public:
+ static void* malloc(size_t size) __THROW { return std::malloc(size); }
+
+ static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n,
size); }
+
+ static constexpr bool need_record_actual_size() { return false; }
+
+ static int posix_memalign(void** ptr, size_t alignment, size_t size)
__THROW {
+ return ::posix_memalign(ptr, alignment, size);
+ }
+
+ static void* realloc(void* ptr, size_t size) __THROW { return
std::realloc(ptr, size); }
+
+ static void free(void* p) __THROW { std::free(p); }
+
+ static void release_unused() {
+#if defined(USE_JEMALLOC)
+ jemallctl(fmt::format("arena.{}.purge", MALLCTL_ARENAS_ALL).c_str(),
NULL, NULL, NULL, 0);
+#endif // defined(USE_JEMALLOC)
+ }
+};
+
+/** It would be better to put these Memory Allocators where they are used,
such as in the orc memory pool and arrow memory pool.
+ * But currently allocators use templates in .cpp instead of all in .h, so
they can only be placed here.
+ */
+class ORCMemoryAllocator {
+public:
+ static void* malloc(size_t size) __THROW { return
reinterpret_cast<char*>(std::malloc(size)); }
+
+ static void* calloc(size_t n, size_t size) __THROW { return std::calloc(n,
size); }
+
+ static constexpr bool need_record_actual_size() { return true; }
+
+ static size_t allocated_size(void* ptr) { return malloc_usable_size(ptr); }
+
+ static int posix_memalign(void** ptr, size_t alignment, size_t size)
__THROW {
+ return ::posix_memalign(ptr, alignment, size);
+ }
+
+ static void* realloc(void* ptr, size_t size) __THROW {
+ LOG(FATAL) << "__builtin_unreachable";
+ __builtin_unreachable();
+ }
+
+ static void free(void* p) __THROW { std::free(p); }
+
+ static void release_unused() {}
+};
+
+class RecordSizeMemoryAllocator {
+public:
+ static void* malloc(size_t size) __THROW {
+ void* p = std::malloc(size);
+ if (p) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _allocated_sizes[p] = size;
+ }
+ return p;
+ }
+
+ static void* calloc(size_t n, size_t size) __THROW {
+ void* p = std::calloc(n, size);
+ if (p) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _allocated_sizes[p] = n * size;
+ }
+ return p;
+ }
+
+ static constexpr bool need_record_actual_size() { return false; }
+
+ static size_t allocated_size(void* ptr) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _allocated_sizes.find(ptr);
+ if (it != _allocated_sizes.end()) {
+ return it->second;
+ }
+ return 0;
+ }
+
+ static int posix_memalign(void** ptr, size_t alignment, size_t size)
__THROW {
+ int ret = ::posix_memalign(ptr, alignment, size);
+ if (ret == 0 && *ptr) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _allocated_sizes[*ptr] = size;
+ }
+ return ret;
+ }
+
+ static void* realloc(void* ptr, size_t size) __THROW {
+ std::lock_guard<std::mutex> lock(_mutex);
+
+ auto it = _allocated_sizes.find(ptr);
+ if (it != _allocated_sizes.end()) {
+ _allocated_sizes.erase(it);
+ }
+
+ void* p = std::realloc(ptr, size);
+
+ if (p) {
+ _allocated_sizes[p] = size;
+ }
+
+ return p;
+ }
+
+ static void free(void* p) __THROW {
+ if (p) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _allocated_sizes.erase(p);
+ std::free(p);
+ }
+ }
+
+ static void release_unused() {}
+
+private:
+ static std::unordered_map<void*, size_t> _allocated_sizes;
+ static std::mutex _mutex;
+};
+
/** Responsible for allocating / freeing memory. Used, for example, in
PODArray, Arena.
* Also used in hash tables.
* The interface is different from std::allocator
@@ -78,7 +204,7 @@ static constexpr int ALLOCATOR_ALIGNMENT_16 = 16;
* - random hint address for mmap
* - mmap_threshold for using mmap less or more
*/
-template <bool clear_memory_, bool mmap_populate, bool use_mmap>
+template <bool clear_memory_, bool mmap_populate, bool use_mmap, typename
MemoryAllocator>
class Allocator {
public:
void sys_memory_check(size_t size) const;
@@ -104,6 +230,7 @@ public:
// consume memory in tracker before alloc, similar to early
declaration.
consume_memory(size);
void* buf;
+ size_t record_size = size;
if (use_mmap && size >= doris::config::mmap_threshold) {
if (alignment > MMAP_MIN_ALIGNMENT)
@@ -117,38 +244,51 @@ public:
release_memory(size);
throw_bad_alloc(fmt::format("Allocator: Cannot mmap {}.",
size));
}
+ if constexpr (MemoryAllocator::need_record_actual_size()) {
+ record_size = MemoryAllocator::allocated_size(buf);
+ }
/// No need for zero-fill, because mmap guarantees it.
} else {
if (alignment <= MALLOC_MIN_ALIGNMENT) {
if constexpr (clear_memory)
- buf = ::calloc(size, 1);
+ buf = MemoryAllocator::calloc(size, 1);
else
- buf = ::malloc(size);
+ buf = MemoryAllocator::malloc(size);
if (nullptr == buf) {
release_memory(size);
throw_bad_alloc(fmt::format("Allocator: Cannot malloc
{}.", size));
}
+ if constexpr (MemoryAllocator::need_record_actual_size()) {
+ record_size = MemoryAllocator::allocated_size(buf);
+ }
#ifndef NDEBUG
- add_address_sanitizers(buf, size);
+ add_address_sanitizers(buf, record_size);
#endif
} else {
buf = nullptr;
- int res = posix_memalign(&buf, alignment, size);
+ int res = MemoryAllocator::posix_memalign(&buf, alignment,
size);
if (0 != res) {
release_memory(size);
throw_bad_alloc(
fmt::format("Cannot allocate memory
(posix_memalign) {}.", size));
}
-#ifndef NDEBUG
- add_address_sanitizers(buf, size);
-#endif
if constexpr (clear_memory) memset(buf, 0, size);
+
+ if constexpr (MemoryAllocator::need_record_actual_size()) {
+ record_size = MemoryAllocator::allocated_size(buf);
+ }
+#ifndef NDEBUG
+ add_address_sanitizers(buf, record_size);
+#endif
}
}
+ if constexpr (MemoryAllocator::need_record_actual_size()) {
+ consume_memory(record_size - size);
+ }
return buf;
}
@@ -162,11 +302,13 @@ public:
#ifndef NDEBUG
remove_address_sanitizers(buf, size);
#endif
- ::free(buf);
+ MemoryAllocator::free(buf);
}
release_memory(size);
}
+ void release_unused() { MemoryAllocator::release_unused(); }
+
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
@@ -187,7 +329,7 @@ public:
remove_address_sanitizers(buf, old_size);
#endif
/// Resize malloc'd memory region with no special alignment
requirement.
- void* new_buf = ::realloc(buf, new_size);
+ void* new_buf = MemoryAllocator::realloc(buf, new_size);
if (nullptr == new_buf) {
release_memory(new_size - old_size);
throw_bad_alloc(fmt::format("Allocator: Cannot realloc from {}
to {}.", old_size,
@@ -195,7 +337,8 @@ public:
}
#ifndef NDEBUG
add_address_sanitizers(
- new_buf, new_size); // usually, buf addr = new_buf addr,
asan maybe not equal.
+ new_buf,
+ new_size); // usually, buf addr = new_buf addr, asan maybe
not equal.
#endif
buf = new_buf;
diff --git a/be/src/vec/common/allocator_fwd.h
b/be/src/vec/common/allocator_fwd.h
index 988f7a5c7af..da43030a133 100644
--- a/be/src/vec/common/allocator_fwd.h
+++ b/be/src/vec/common/allocator_fwd.h
@@ -24,7 +24,9 @@
#pragma once
#include <cstddef>
-template <bool clear_memory_, bool mmap_populate = false, bool use_mmap =
false>
+class DefaultMemoryAllocator;
+template <bool clear_memory_, bool mmap_populate = false, bool use_mmap =
false,
+ typename MemoryAllocator = DefaultMemoryAllocator>
class Allocator;
template <typename Base, size_t N = 64, size_t Alignment = 1>
diff --git a/be/src/vec/common/hash_table/phmap_fwd_decl.h
b/be/src/vec/common/hash_table/phmap_fwd_decl.h
index 62373410968..c6ff77a7e71 100644
--- a/be/src/vec/common/hash_table/phmap_fwd_decl.h
+++ b/be/src/vec/common/hash_table/phmap_fwd_decl.h
@@ -26,7 +26,7 @@ namespace doris::vectorized {
/// `Allocator_` implements several interfaces of `std::allocator`
/// which `phmap::flat_hash_map` will use.
template <typename T>
-class Allocator_ : private Allocator<true, false, false> {
+class Allocator_ : private Allocator<true, false, false,
DefaultMemoryAllocator> {
public:
using value_type = T;
using pointer = T*;
diff --git a/be/src/vec/exec/format/orc/orc_memory_pool.h
b/be/src/vec/exec/format/orc/orc_memory_pool.h
new file mode 100644
index 00000000000..1df3d63f952
--- /dev/null
+++ b/be/src/vec/exec/format/orc/orc_memory_pool.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "orc/MemoryPool.hh"
+#include "vec/common/allocator.h"
+
+#if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) ||
defined(THREAD_SANITIZER)
+using ORC_MEMORY_ALLOCATOR = RecordSizeMemoryAllocator;
+#else
+using ORC_MEMORY_ALLOCATOR = ORCMemoryAllocator;
+#endif
+
+namespace doris::vectorized {
+
+class ORCMemoryPool : public orc::MemoryPool {
+public:
+ char* malloc(uint64_t size) override {
+ char* p = reinterpret_cast<char*>(_allocator.alloc(size));
+ return p;
+ }
+
+ void free(char* p) override {
+ if (p == nullptr) {
+ return;
+ }
+ size_t size = ORC_MEMORY_ALLOCATOR::allocated_size(p);
+ _allocator.free(p, size);
+ }
+
+ ORCMemoryPool() = default;
+ ~ORCMemoryPool() override = default;
+
+private:
+ Allocator<false, false, false, ORC_MEMORY_ALLOCATOR> _allocator;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/orc/vorc_reader.cpp
b/be/src/vec/exec/format/orc/vorc_reader.cpp
index e2ba3a57be8..b70e3496133 100644
--- a/be/src/vec/exec/format/orc/vorc_reader.cpp
+++ b/be/src/vec/exec/format/orc/vorc_reader.cpp
@@ -71,6 +71,7 @@
#include "vec/data_types/data_type_map.h"
#include "vec/data_types/data_type_nullable.h"
#include "vec/data_types/data_type_struct.h"
+#include "vec/exec/format/orc/orc_memory_pool.h"
#include "vec/exec/format/table/transactional_hive_common.h"
#include "vec/exprs/vbloom_predicate.h"
#include "vec/exprs/vdirect_in_predicate.h"
@@ -252,6 +253,7 @@ Status OrcReader::_create_file_reader() {
// create orc reader
try {
orc::ReaderOptions options;
+ options.setMemoryPool(*ExecEnv::GetInstance()->orc_memory_pool());
_reader = orc::createReader(
std::unique_ptr<ORCFileInputStream>(_file_input_stream.release()), options);
} catch (std::exception& e) {
diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp
b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp
new file mode 100644
index 00000000000..ed06e5c821a
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.cpp
@@ -0,0 +1,74 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/exec/format/parquet/arrow_memory_pool.h"
+
+#include "glog/logging.h"
+
+namespace doris::vectorized {
+
+// A static piece of memory for 0-size allocations, so as to return
+// an aligned non-null pointer. Note the correct value for DebugAllocator
+// checks is hardcoded.
+alignas(kDefaultBufferAlignment) int64_t zero_size_area[1] = {kDebugXorSuffix};
+
+arrow::Status ArrowAllocator::allocate_aligned(int64_t size, int64_t
alignment, uint8_t** out) {
+ if (size == 0) {
+ *out = kZeroSizeArea;
+ return arrow::Status::OK();
+ }
+ *out = reinterpret_cast<uint8_t*>(_allocator.alloc(size, alignment));
+ if (*out == nullptr) {
+ return arrow::Status::OutOfMemory("malloc of size ", size, " failed");
+ }
+ return arrow::Status::OK();
+}
+
+arrow::Status ArrowAllocator::reallocate_aligned(int64_t old_size, int64_t
new_size,
+ int64_t alignment, uint8_t**
ptr) {
+ uint8_t* previous_ptr = *ptr;
+ if (previous_ptr == kZeroSizeArea) {
+ DCHECK_EQ(old_size, 0);
+ return allocate_aligned(new_size, alignment, ptr);
+ }
+ if (new_size == 0) {
+ deallocate_aligned(previous_ptr, old_size, alignment);
+ *ptr = kZeroSizeArea;
+ return arrow::Status::OK();
+ }
+ *ptr = reinterpret_cast<uint8_t*>(_allocator.realloc(*ptr,
static_cast<size_t>(old_size),
+
static_cast<size_t>(new_size), alignment));
+ if (*ptr == nullptr) {
+ *ptr = previous_ptr;
+ return arrow::Status::OutOfMemory("realloc of size ", new_size, "
failed");
+ }
+ return arrow::Status::OK();
+}
+
+void ArrowAllocator::deallocate_aligned(uint8_t* ptr, int64_t size, int64_t
alignment) {
+ if (ptr == kZeroSizeArea) {
+ DCHECK_EQ(size, 0);
+ } else {
+ _allocator.free(ptr, static_cast<size_t>(size));
+ }
+}
+
+void ArrowAllocator::release_unused() {
+ _allocator.release_unused();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/format/parquet/arrow_memory_pool.h
b/be/src/vec/exec/format/parquet/arrow_memory_pool.h
new file mode 100644
index 00000000000..a93e426f374
--- /dev/null
+++ b/be/src/vec/exec/format/parquet/arrow_memory_pool.h
@@ -0,0 +1,199 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "vec/common/allocator.h"
+#include "vec/common/allocator_fwd.h"
+
+namespace doris::vectorized {
+
+constexpr int64_t kDefaultBufferAlignment = 64;
+static constexpr int64_t kDebugXorSuffix = -0x181fe80e0b464188LL;
+#ifndef NDEBUG
+static constexpr uint8_t kAllocPoison = 0xBC;
+static constexpr uint8_t kReallocPoison = 0xBD;
+static constexpr uint8_t kDeallocPoison = 0xBE;
+#endif
+
+// A static piece of memory for 0-size allocations, so as to return
+// an aligned non-null pointer. Note the correct value for DebugAllocator
+// checks is hardcoded.
+extern int64_t zero_size_area[1];
+static uint8_t* const kZeroSizeArea =
reinterpret_cast<uint8_t*>(&zero_size_area);
+
+using ARROW_MEMORY_ALLOCATOR = DefaultMemoryAllocator;
+
+class ArrowAllocator {
+public:
+ arrow::Status allocate_aligned(int64_t size, int64_t alignment, uint8_t**
out);
+ arrow::Status reallocate_aligned(int64_t old_size, int64_t new_size,
int64_t alignment,
+ uint8_t** ptr);
+ void deallocate_aligned(uint8_t* ptr, int64_t size, int64_t alignment);
+ void release_unused();
+
+private:
+ Allocator<false, false, false, ARROW_MEMORY_ALLOCATOR> _allocator;
+};
+
+///////////////////////////////////////////////////////////////////////
+// Helper tracking memory statistics
+
+/// \brief Memory pool statistics
+///
+/// 64-byte aligned so that all atomic values are on the same cache line.
+class alignas(64) ArrowMemoryPoolStats {
+private:
+ // All atomics are updated according to Acquire-Release ordering.
+ //
https://en.cppreference.com/w/cpp/atomic/memory_order#Release-Acquire_ordering
+ //
+ // max_memory_, total_allocated_bytes_, and num_allocs_ only go up (they
are
+ // monotonically increasing) which can allow some optimizations.
+ std::atomic<int64_t> max_memory_ {0};
+ std::atomic<int64_t> bytes_allocated_ {0};
+ std::atomic<int64_t> total_allocated_bytes_ {0};
+ std::atomic<int64_t> num_allocs_ {0};
+
+public:
+ int64_t max_memory() const { return
max_memory_.load(std::memory_order_acquire); }
+
+ int64_t bytes_allocated() const { return
bytes_allocated_.load(std::memory_order_acquire); }
+
+ int64_t total_bytes_allocated() const {
+ return total_allocated_bytes_.load(std::memory_order_acquire);
+ }
+
+ int64_t num_allocations() const { return
num_allocs_.load(std::memory_order_acquire); }
+
+ inline void did_allocate_bytes(int64_t size) {
+ // Issue the load before everything else. max_memory_ is monotonically
increasing,
+ // so we can use a relaxed load before the read-modify-write.
+ auto max_memory = max_memory_.load(std::memory_order_relaxed);
+ const auto old_bytes_allocated =
+ bytes_allocated_.fetch_add(size, std::memory_order_acq_rel);
+ // Issue store operations on values that we don't depend on to proceed
+ // with execution. When done, max_memory and old_bytes_allocated have
+ // a higher chance of being available on CPU registers. This also has
the
+ // nice side-effect of putting 3 atomic stores close to each other in
the
+ // instruction stream.
+ total_allocated_bytes_.fetch_add(size, std::memory_order_acq_rel);
+ num_allocs_.fetch_add(1, std::memory_order_acq_rel);
+
+ // If other threads are updating max_memory_ concurrently we leave the
loop without
+ // updating knowing that it already reached a value even higher than
ours.
+ const auto allocated = old_bytes_allocated + size;
+ while (max_memory < allocated &&
+ !max_memory_.compare_exchange_weak(
+ /*expected=*/max_memory, /*desired=*/allocated,
std::memory_order_acq_rel)) {
+ }
+ }
+
+ inline void did_reallocate_bytes(int64_t old_size, int64_t new_size) {
+ if (new_size > old_size) {
+ did_allocate_bytes(new_size - old_size);
+ } else {
+ did_free_bytes(old_size - new_size);
+ }
+ }
+
+ inline void did_free_bytes(int64_t size) {
+ bytes_allocated_.fetch_sub(size, std::memory_order_acq_rel);
+ }
+};
+
+template <typename Allocator = ArrowAllocator>
+class ArrowMemoryPool : public arrow::MemoryPool {
+public:
+ ~ArrowMemoryPool() override = default;
+
+ arrow::Status Allocate(int64_t size, int64_t alignment, uint8_t** out)
override {
+ if (size < 0) {
+ return arrow::Status::Invalid("negative malloc size");
+ }
+ if (static_cast<uint64_t>(size) >= std::numeric_limits<size_t>::max())
{
+ return arrow::Status::OutOfMemory("malloc size overflows size_t");
+ }
+ RETURN_NOT_OK(_allocator.allocate_aligned(size, alignment, out));
+#ifndef NDEBUG
+ // Poison data
+ if (size > 0) {
+ DCHECK_NE(*out, nullptr);
+ (*out)[0] = kAllocPoison;
+ (*out)[size - 1] = kAllocPoison;
+ }
+#endif
+
+ _stats.did_allocate_bytes(size);
+ return arrow::Status::OK();
+ }
+
+ arrow::Status Reallocate(int64_t old_size, int64_t new_size, int64_t
alignment,
+ uint8_t** ptr) override {
+ if (new_size < 0) {
+ return arrow::Status::Invalid("negative realloc size");
+ }
+ if (static_cast<uint64_t>(new_size) >=
std::numeric_limits<size_t>::max()) {
+ return arrow::Status::OutOfMemory("realloc overflows size_t");
+ }
+ RETURN_NOT_OK(_allocator.reallocate_aligned(old_size, new_size,
alignment, ptr));
+#ifndef NDEBUG
+ // Poison data
+ if (new_size > old_size) {
+ DCHECK_NE(*ptr, nullptr);
+ (*ptr)[old_size] = kReallocPoison;
+ (*ptr)[new_size - 1] = kReallocPoison;
+ }
+#endif
+
+ _stats.did_reallocate_bytes(old_size, new_size);
+ return arrow::Status::OK();
+ }
+
+ void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
+#ifndef NDEBUG
+ // Poison data
+ if (size > 0) {
+ DCHECK_NE(buffer, nullptr);
+ buffer[0] = kDeallocPoison;
+ buffer[size - 1] = kDeallocPoison;
+ }
+#endif
+ _allocator.deallocate_aligned(buffer, size, alignment);
+
+ _stats.did_free_bytes(size);
+ }
+
+ void ReleaseUnused() override { _allocator.release_unused(); }
+
+ int64_t bytes_allocated() const override { return
_stats.bytes_allocated(); }
+
+ int64_t max_memory() const override { return _stats.max_memory(); }
+
+ int64_t total_bytes_allocated() const override { return
_stats.total_bytes_allocated(); }
+
+ int64_t num_allocations() const override { return
_stats.num_allocations(); }
+
+ std::string backend_name() const override { return "ArrowMemoryPool"; }
+
+protected:
+ ArrowMemoryPoolStats _stats;
+ Allocator _allocator;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/runtime/vorc_transformer.cpp
b/be/src/vec/runtime/vorc_transformer.cpp
index 09bae276d65..6c512a94373 100644
--- a/be/src/vec/runtime/vorc_transformer.cpp
+++ b/be/src/vec/runtime/vorc_transformer.cpp
@@ -31,6 +31,7 @@
#include "orc/OrcFile.hh"
#include "orc/Vector.hh"
#include "runtime/define_primitive_type.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/binary_cast.hpp"
@@ -151,6 +152,7 @@ Status VOrcTransformer::open() {
_output_stream = std::make_unique<VOrcOutputStream>(_file_writer);
try {
+
_write_options->setMemoryPool(ExecEnv::GetInstance()->orc_memory_pool());
_writer = orc::createWriter(*_schema, _output_stream.get(),
*_write_options);
} catch (const std::exception& e) {
return Status::InternalError("failed to create writer: {}", e.what());
@@ -314,15 +316,15 @@ int64_t VOrcTransformer::written_len() {
}
Status VOrcTransformer::close() {
- if (_writer != nullptr) {
- try {
+ try {
+ if (_writer != nullptr) {
_writer->close();
- } catch (const std::exception& e) {
- return Status::IOError(e.what());
}
- }
- if (_output_stream) {
- _output_stream->close();
+ if (_output_stream) {
+ _output_stream->close();
+ }
+ } catch (const std::exception& e) {
+ return Status::IOError(e.what());
}
return Status::OK();
}
@@ -353,13 +355,13 @@ Status VOrcTransformer::write(const Block& block) {
RETURN_IF_ERROR(_serdes[i]->write_column_to_orc(
_state->timezone(), *raw_column, nullptr, root->fields[i],
0, sz, buffer_list));
}
+ root->numElements = sz;
+ _writer->add(*row_batch);
+ _cur_written_rows += sz;
} catch (const std::exception& e) {
LOG(WARNING) << "Orc write error: " << e.what();
return Status::InternalError(e.what());
}
- root->numElements = sz;
- _writer->add(*row_batch);
- _cur_written_rows += sz;
return Status::OK();
}
diff --git a/be/src/vec/runtime/vparquet_transformer.cpp
b/be/src/vec/runtime/vparquet_transformer.cpp
index 116a898c4f1..1969858349f 100644
--- a/be/src/vec/runtime/vparquet_transformer.cpp
+++ b/be/src/vec/runtime/vparquet_transformer.cpp
@@ -42,6 +42,7 @@
#include "olap/olap_common.h"
#include "runtime/decimalv2_value.h"
#include "runtime/define_primitive_type.h"
+#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "util/arrow/block_convertor.h"
@@ -237,6 +238,7 @@ VParquetTransformer::VParquetTransformer(RuntimeState*
state, doris::io::FileWri
Status VParquetTransformer::_parse_properties() {
try {
+ arrow::MemoryPool* pool = ExecEnv::GetInstance()->arrow_memory_pool();
parquet::WriterProperties::Builder builder;
ParquetBuildHelper::build_compression_type(builder, _compression_type);
ParquetBuildHelper::build_version(builder, _parquet_version);
@@ -248,6 +250,7 @@ Status VParquetTransformer::_parse_properties() {
builder.created_by(
fmt::format("{}({})", doris::get_short_version(),
parquet::DEFAULT_CREATED_BY));
builder.max_row_group_length(std::numeric_limits<int64_t>::max());
+ builder.memory_pool(pool);
_parquet_writer_properties = builder.build();
_arrow_properties = parquet::ArrowWriterProperties::Builder()
.enable_deprecated_int96_timestamps()
@@ -292,8 +295,9 @@ Status VParquetTransformer::write(const Block& block) {
// serialize
std::shared_ptr<arrow::RecordBatch> result;
- RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
arrow::default_memory_pool(),
- &result, _state->timezone_obj()));
+ RETURN_IF_ERROR(convert_to_arrow_batch(block, _arrow_schema,
+
ExecEnv::GetInstance()->arrow_memory_pool(), &result,
+ _state->timezone_obj()));
RETURN_DORIS_STATUS_IF_ERROR(_writer->WriteRecordBatch(*result));
_write_size += block.bytes();
@@ -305,9 +309,10 @@ Status VParquetTransformer::write(const Block& block) {
}
arrow::Status VParquetTransformer::_open_file_writer() {
- ARROW_ASSIGN_OR_RAISE(_writer, parquet::arrow::FileWriter::Open(
- *_arrow_schema,
arrow::default_memory_pool(), _outstream,
- _parquet_writer_properties,
_arrow_properties));
+ ARROW_ASSIGN_OR_RAISE(_writer,
+ parquet::arrow::FileWriter::Open(
+ *_arrow_schema,
ExecEnv::GetInstance()->arrow_memory_pool(),
+ _outstream, _parquet_writer_properties,
_arrow_properties));
return arrow::Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]