yiguolei commented on code in PR #52977:
URL: https://github.com/apache/doris/pull/52977#discussion_r2193916792
##########
be/src/http/action/stream_load.cpp:
##########
@@ -352,27 +352,29 @@ void StreamLoadAction::on_chunk_data(HttpRequest* req) {
struct evhttp_request* ev_req = req->get_evhttp_request();
auto evbuf = evhttp_request_get_input_buffer(ev_req);
+ // TODO: Add ByteBuffer as a member of StreamLoadContext
+ ByteBufferPtr bb;
+ if (Status status = ByteBuffer::allocate(128 * 1024, &bb); !status.ok()) {
+ ctx->status = status;
+ return;
+ }
+
SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->stream_load_pipe_tracker());
int64_t start_read_data_time = MonotonicNanos();
while (evbuffer_get_length(evbuf) > 0) {
- ByteBufferPtr bb;
- Status st = ByteBuffer::allocate(128 * 1024, &bb);
- if (!st.ok()) {
- ctx->status = st;
- return;
- }
auto remove_bytes = evbuffer_remove(evbuf, bb->ptr, bb->capacity);
bb->pos = remove_bytes;
bb->flip();
- st = ctx->body_sink->append(bb);
- if (!st.ok()) {
+
+ if (Status st = ctx->body_sink->append(bb); !st.ok()) {
Review Comment:
the buffer could not be reusable. because we have code like this.
Status StreamLoadPipe::_append(const ByteBufferPtr& buf, size_t
proto_byte_size) {
{
std::unique_lock<std::mutex> l(_lock);
// if _buf_queue is empty, we append this buf without size check
if (_use_proto) {
while (!_cancelled && !_buf_queue.empty() &&
(_proto_buffered_bytes + proto_byte_size >
_max_buffered_bytes)) {
_put_cond.wait(l);
}
} else {
while (!_cancelled && !_buf_queue.empty() &&
_buffered_bytes + buf->remaining() > _max_buffered_bytes)
{
_put_cond.wait(l);
}
}
if (_cancelled) {
return Status::Cancelled("cancelled: {}", _cancelled_reason);
}
_buf_queue.push_back(buf);
if (_use_proto) {
_proto_buffered_bytes += proto_byte_size;
} else {
_buffered_bytes += buf->remaining();
}
}
_get_cond.notify_one();
return Status::OK();
}
we will push the buffer directly to a queue.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]