This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 86246f78bd0 [opt](bytebuffer) allocate ByteBuffer memory by Allocator
and make it exception safe (#38960)
86246f78bd0 is described below
commit 86246f78bd0973943fd7a2ff1159d4cd730a5d24
Author: hui lai <[email protected]>
AuthorDate: Mon Aug 12 11:17:43 2024 +0800
[opt](bytebuffer) allocate ByteBuffer memory by Allocator and make it
exception safe (#38960)
At present, the memory allocation of `ByteBuffer` is done through `new
char[capacity_]`. Now, it is uniformly allocated by `Allocator` for the
following purposes:
1. Better memory statistics
2. Better support for memory limit check
---
be/src/http/action/http_stream.cpp | 51 +++++++++++++---------
be/src/http/action/stream_load.cpp | 32 +++++++++-----
be/src/io/fs/stream_load_pipe.cpp | 7 ++-
be/src/runtime/exec_env.h | 4 ++
be/src/runtime/exec_env_init.cpp | 2 +
be/src/runtime/stream_load/stream_load_context.h | 8 +++-
.../runtime/stream_load/stream_load_executor.cpp | 4 ++
be/src/util/byte_buffer.h | 19 +++++---
8 files changed, 87 insertions(+), 40 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index e7bfa839111..afeb251ca41 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -234,31 +234,40 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
- auto bb = ByteBuffer::allocate(128 * 1024);
- auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
- bb->pos = remove_bytes;
- bb->flip();
- auto st = ctx->body_sink->append(bb);
- // schema_buffer stores 1M of data for parsing column information
- // need to determine whether to cache for the first time
- if (ctx->is_read_schema) {
- if (ctx->schema_buffer->pos + remove_bytes <
config::stream_tvf_buffer_size) {
- ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
- } else {
- LOG(INFO) << "use a portion of data to request fe to obtain
column information";
- ctx->is_read_schema = false;
- ctx->status = process_put(req, ctx);
+ try {
+ auto bb = ByteBuffer::allocate(128 * 1024);
+ auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+ bb->pos = remove_bytes;
+ bb->flip();
+ auto st = ctx->body_sink->append(bb);
+ // schema_buffer stores 1M of data for parsing column information
+ // need to determine whether to cache for the first time
+ if (ctx->is_read_schema) {
+ if (ctx->schema_buffer->pos + remove_bytes <
config::stream_tvf_buffer_size) {
+ ctx->schema_buffer->put_bytes(bb->ptr, remove_bytes);
+ } else {
+ LOG(INFO) << "use a portion of data to request fe to
obtain column information";
+ ctx->is_read_schema = false;
+ ctx->status = process_put(req, ctx);
+ }
}
+ if (!st.ok() && !ctx->status.ok()) {
+ LOG(WARNING) << "append body content failed. errmsg=" << st <<
", " << ctx->brief();
+ ctx->status = st;
+ return;
+ }
+ ctx->receive_bytes += remove_bytes;
+ } catch (const doris::Exception& e) {
+ if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
+ ctx->status = Status::MemoryLimitExceeded(
+ fmt::format("PreCatch error code:{}, {}, ", e.code(),
e.to_string()));
+ }
+ ctx->status = Status::Error<false>(e.code(), e.to_string());
}
-
- if (!st.ok() && !ctx->status.ok()) {
- LOG(WARNING) << "append body content failed. errmsg=" << st << ",
" << ctx->brief();
- ctx->status = st;
- return;
- }
- ctx->receive_bytes += remove_bytes;
}
// after all the data has been read and it has not reached 1M, it will
execute here
if (ctx->is_read_schema) {
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index 75d6943d3c6..d0c5dff2075 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -340,19 +340,29 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
- auto bb = ByteBuffer::allocate(128 * 1024);
- auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
- bb->pos = remove_bytes;
- bb->flip();
- auto st = ctx->body_sink->append(bb);
- if (!st.ok()) {
- LOG(WARNING) << "append body content failed. errmsg=" << st << ",
" << ctx->brief();
- ctx->status = st;
- return;
- }
- ctx->receive_bytes += remove_bytes;
+ try {
+ auto bb = ByteBuffer::allocate(128 * 1024);
+ auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
+ bb->pos = remove_bytes;
+ bb->flip();
+ auto st = ctx->body_sink->append(bb);
+ if (!st.ok()) {
+ LOG(WARNING) << "append body content failed. errmsg=" << st <<
", " << ctx->brief();
+ ctx->status = st;
+ return;
+ }
+ ctx->receive_bytes += remove_bytes;
+ } catch (const doris::Exception& e) {
+ if (e.code() == doris::ErrorCode::MEM_ALLOC_FAILED) {
+ ctx->status = Status::MemoryLimitExceeded(
+ fmt::format("PreCatch error code:{}, {}, ", e.code(),
e.to_string()));
+ }
+ ctx->status = Status::Error<false>(e.code(), e.to_string());
+ }
}
int64_t read_data_time = MonotonicNanos() - start_read_data_time;
int64_t last_receive_and_read_data_cost_nanos =
ctx->receive_and_read_data_cost_nanos;
diff --git a/be/src/io/fs/stream_load_pipe.cpp
b/be/src/io/fs/stream_load_pipe.cpp
index 21c3856a815..ce91a2e8391 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -111,7 +111,9 @@ Status
StreamLoadPipe::read_one_message(std::unique_ptr<uint8_t[]>* data, size_t
}
Status StreamLoadPipe::append_and_flush(const char* data, size_t size, size_t
proto_byte_size) {
- ByteBufferPtr buf = ByteBuffer::allocate(BitUtil::RoundUpToPowerOfTwo(size
+ 1));
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+ ByteBufferPtr buf;
+ RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(buf,
128 * 1024));
buf->put_bytes(data, size);
buf->flip();
return _append(buf, proto_byte_size);
@@ -145,7 +147,8 @@ Status StreamLoadPipe::append(const char* data, size_t
size) {
// need to allocate a new chunk, min chunk is 64k
size_t chunk_size = std::max(_min_chunk_size, size - pos);
chunk_size = BitUtil::RoundUpToPowerOfTwo(chunk_size);
- _write_buf = ByteBuffer::allocate(chunk_size);
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(ByteBuffer::create_and_allocate(_write_buf,
chunk_size));
_write_buf->put_bytes(data + pos, size - pos);
return Status::OK();
}
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 04bd5aa672a..e686df2dfd6 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -180,6 +180,9 @@ public:
std::shared_ptr<MemTrackerLimiter> segcompaction_mem_tracker() {
return _segcompaction_mem_tracker;
}
+ std::shared_ptr<MemTrackerLimiter> stream_load_pipe_tracker() {
+ return _stream_load_pipe_tracker;
+ }
std::shared_ptr<MemTrackerLimiter> point_query_executor_mem_tracker() {
return _point_query_executor_mem_tracker;
}
@@ -358,6 +361,7 @@ private:
std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
// Count the memory consumption of segment compaction tasks.
std::shared_ptr<MemTrackerLimiter> _segcompaction_mem_tracker;
+ std::shared_ptr<MemTrackerLimiter> _stream_load_pipe_tracker;
// Tracking memory may be shared between multiple queries.
std::shared_ptr<MemTrackerLimiter> _point_query_executor_mem_tracker;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 8c308f4b4d8..d160e9abdc2 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -616,6 +616,8 @@ void ExecEnv::init_mem_tracker() {
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"SubcolumnsTree");
_s3_file_buffer_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"S3FileBuffer");
+ _stream_load_pipe_tracker =
+ MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::GLOBAL,
"StreamLoadPipe");
}
void ExecEnv::_register_metrics() {
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index 633c3af428b..f7c4a0d474f 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -37,6 +37,7 @@
#include "common/utils.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/thread_context.h"
#include "util/byte_buffer.h"
#include "util/time.h"
#include "util/uid_util.h"
@@ -95,9 +96,14 @@ class StreamLoadContext {
public:
StreamLoadContext(ExecEnv* exec_env) : id(UniqueId::gen_uid()),
_exec_env(exec_env) {
start_millis = UnixMillis();
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
+ schema_buffer = ByteBuffer::allocate(config::stream_tvf_buffer_size);
}
~StreamLoadContext() {
+ SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(
+ ExecEnv::GetInstance()->stream_load_pipe_tracker());
+ schema_buffer.reset();
if (need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(this);
need_rollback = false;
@@ -184,7 +190,7 @@ public:
std::shared_ptr<MessageBodySink> body_sink;
std::shared_ptr<io::StreamLoadPipe> pipe;
- ByteBufferPtr schema_buffer =
ByteBuffer::allocate(config::stream_tvf_buffer_size);
+ ByteBufferPtr schema_buffer;
TStreamLoadPutResult put_result;
TStreamLoadMultiTablePutResult multi_table_put_result;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index 28b0556aafd..4ddd29ac9c3 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -142,6 +142,10 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos /
1000000;
};
+ // Reset thread memory tracker, otherwise SCOPED_ATTACH_TASK will be
called nested, nesting is
+ // not allowed, first time in on_chunk_data, second time in
StreamLoadExecutor::execute_plan_fragment.
+
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
+
if (ctx->put_result.__isset.params) {
st =
_exec_env->fragment_mgr()->exec_plan_fragment(ctx->put_result.params,
exec_fragment);
} else {
diff --git a/be/src/util/byte_buffer.h b/be/src/util/byte_buffer.h
index aab8fd42db6..e8eadf69e02 100644
--- a/be/src/util/byte_buffer.h
+++ b/be/src/util/byte_buffer.h
@@ -23,19 +23,27 @@
#include <memory>
#include "common/logging.h"
+#include "common/status.h"
+#include "vec/common/allocator.h"
+#include "vec/common/allocator_fwd.h"
namespace doris {
struct ByteBuffer;
using ByteBufferPtr = std::shared_ptr<ByteBuffer>;
-struct ByteBuffer {
+struct ByteBuffer : private Allocator<false> {
static ByteBufferPtr allocate(size_t size) {
ByteBufferPtr ptr(new ByteBuffer(size));
return ptr;
}
- ~ByteBuffer() { delete[] ptr; }
+ static Status create_and_allocate(ByteBufferPtr& ptr, size_t size) {
+ ptr = ByteBufferPtr(new ByteBuffer(size));
+ return Status::OK();
+ }
+
+ ~ByteBuffer() { Allocator<false>::free(ptr, capacity); }
void put_bytes(const char* data, size_t size) {
memcpy(ptr + pos, data, size);
@@ -56,14 +64,15 @@ struct ByteBuffer {
size_t remaining() const { return limit - pos; }
bool has_remaining() const { return limit > pos; }
- char* const ptr;
+ char* ptr;
size_t pos;
size_t limit;
size_t capacity;
private:
- ByteBuffer(size_t capacity_)
- : ptr(new char[capacity_]), pos(0), limit(capacity_),
capacity(capacity_) {}
+ ByteBuffer(size_t capacity_) : pos(0), limit(capacity_),
capacity(capacity_) {
+ ptr = reinterpret_cast<char*>(Allocator<false>::alloc(capacity_));
+ }
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]