This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ea39daa2d32 [Fix](stream-load) Fix stream load stuck under high
concurrency (#36772)
ea39daa2d32 is described below
commit ea39daa2d32ba9466a969cf7d39090491bfbc62c
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jun 25 13:39:45 2024 +0800
[Fix](stream-load) Fix stream load stuck under high concurrency (#36772)
## Proposed changes
When the concurrency of streamload exceeds the number of threads in the
remote scanner, streamload may get stuck. The reason is that the
libevent thread blocks and waits for streamload to complete, and when
there is no intersection between the tasks handled by the scanner thread
and the libevent thread, it gets stuck.
The solution is to convert the synchronous waiting tasks of libevent
into asynchronous execution by using callbacks in the streamload
executor thread.
---
be/src/http/action/stream_load.cpp | 93 +++++++++++++---------
be/src/http/action/stream_load.h | 4 +-
.../runtime/stream_load/stream_load_executor.cpp | 11 ++-
be/src/runtime/stream_load/stream_load_executor.h | 5 ++
4 files changed, 72 insertions(+), 41 deletions(-)
diff --git a/be/src/http/action/stream_load.cpp
b/be/src/http/action/stream_load.cpp
index eb8d364cf59..0f14270da02 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -33,6 +33,7 @@
#include <time.h>
#include <algorithm>
+#include <functional>
#include <future>
#include <map>
#include <sstream>
@@ -108,20 +109,67 @@ void StreamLoadAction::handle(HttpRequest* req) {
// status already set to fail
if (ctx->status.ok()) {
- ctx->status = _handle(ctx);
+ ctx->status = _handle(ctx, req);
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
<< ", errmsg=" << ctx->status;
+ _send_reply(ctx, req);
}
}
+}
+
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx,
HttpRequest* req) {
+ if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+ LOG(WARNING) << "recevie body don't equal with body bytes,
body_bytes=" << ctx->body_bytes
+ << ", receive_bytes=" << ctx->receive_bytes << ", id=" <<
ctx->id;
+ return Status::InternalError("receive body don't equal with body
bytes");
+ }
+
+ // if we use non-streaming, MessageBodyFileSink.finish will close the file
+ RETURN_IF_ERROR(ctx->body_sink->finish());
+ if (!ctx->use_streaming) {
+ // we need to close file first, then execute_plan_fragment here
+ ctx->body_sink.reset();
+
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
+ ctx,
+ [req, this](std::shared_ptr<StreamLoadContext> ctx) {
_on_finish(ctx, req); }));
+ }
+
+ return Status::OK();
+}
+
+void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx,
HttpRequest* req) {
+ ctx->status = ctx->future.get();
+ if (ctx->status.ok()) {
+ if (ctx->group_commit) {
+ LOG(INFO) << "skip commit because this is group commit, pipe_id="
+ << ctx->id.to_string();
+ } else if (ctx->two_phase_commit) {
+ int64_t pre_commit_start_time = MonotonicNanos();
+ ctx->status =
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+ ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
+ } else {
+ // If put file success we need commit this load
+ int64_t commit_and_publish_start_time = MonotonicNanos();
+ ctx->status =
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+ ctx->commit_and_publish_txn_cost_nanos =
+ MonotonicNanos() - commit_and_publish_start_time;
+ }
+ }
+ _send_reply(ctx, req);
+}
+
+void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx,
HttpRequest* req) {
ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+ LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+ << ", errmsg=" << ctx->status;
if (ctx->need_rollback) {
_exec_env->stream_load_executor()->rollback_txn(ctx.get());
ctx->need_rollback = false;
}
- if (ctx->body_sink.get() != nullptr) {
+ if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(ctx->status.to_string());
}
}
@@ -141,42 +189,6 @@ void StreamLoadAction::handle(HttpRequest* req) {
streaming_load_duration_ms->increment(ctx->load_cost_millis);
}
-Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
- if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
- LOG(WARNING) << "recevie body don't equal with body bytes,
body_bytes=" << ctx->body_bytes
- << ", receive_bytes=" << ctx->receive_bytes << ", id=" <<
ctx->id;
- return Status::InternalError("receive body don't equal with body
bytes");
- }
-
- // if we use non-streaming, MessageBodyFileSink.finish will close the file
- RETURN_IF_ERROR(ctx->body_sink->finish());
- if (!ctx->use_streaming) {
- // we need to close file first, then execute_plan_fragment here
- ctx->body_sink.reset();
-
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx));
- }
-
- // wait stream load finish
- RETURN_IF_ERROR(ctx->future.get());
-
- if (ctx->group_commit) {
- LOG(INFO) << "skip commit because this is group commit, pipe_id=" <<
ctx->id.to_string();
- return Status::OK();
- }
-
- if (ctx->two_phase_commit) {
- int64_t pre_commit_start_time = MonotonicNanos();
-
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
- ctx->pre_commit_txn_cost_nanos = MonotonicNanos() -
pre_commit_start_time;
- } else {
- // If put file success we need commit this load
- int64_t commit_and_publish_start_time = MonotonicNanos();
-
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
- ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() -
commit_and_publish_start_time;
- }
- return Status::OK();
-}
-
int StreamLoadAction::on_header(HttpRequest* req) {
streaming_load_current_processing->increment(1);
@@ -681,7 +693,10 @@ Status StreamLoadAction::_process_put(HttpRequest*
http_req,
return Status::OK();
}
- return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+ return _exec_env->stream_load_executor()->execute_plan_fragment(
+ ctx, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
+ _on_finish(ctx, http_req);
+ });
}
Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string*
file_path) {
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index d1de89c9397..f91334e7305 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -46,11 +46,13 @@ public:
private:
Status _on_header(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
- Status _handle(std::shared_ptr<StreamLoadContext> ctx);
+ Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
Status _data_saved_path(HttpRequest* req, std::string* file_path);
Status _process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str);
Status _handle_group_commit(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
+ void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
+ void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
private:
ExecEnv* _exec_env;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp
b/be/src/runtime/stream_load/stream_load_executor.cpp
index d26beb66827..62be00f1a7b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -27,8 +27,10 @@
#include <glog/logging.h>
#include <stdint.h>
+#include <functional>
#include <future>
#include <map>
+#include <memory>
#include <ostream>
#include <string>
#include <utility>
@@ -66,13 +68,19 @@ bvar::LatencyRecorder
g_stream_load_precommit_txn_latency("stream_load", "precom
bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load",
"commit_txn");
Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext>
ctx) {
+ return execute_plan_fragment(ctx, [](std::shared_ptr<StreamLoadContext>
ctx) {});
+}
+
+Status StreamLoadExecutor::execute_plan_fragment(
+ std::shared_ptr<StreamLoadContext> ctx,
+ const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& cb)
{
// submit this params
#ifndef BE_TEST
ctx->start_write_data_nanos = MonotonicNanos();
LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ",
txn_id=" << ctx->txn_id
<< ", query_id=" << ctx->id;
Status st;
- auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
+ auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) {
if (ctx->group_commit) {
ctx->label = state->import_label();
ctx->txn_id = state->wal_id();
@@ -142,6 +150,7 @@ Status
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
<< (ctx->receive_and_read_data_cost_nanos -
ctx->read_data_cost_nanos) / 1000000
<< ", read_data_cost_ms=" << ctx->read_data_cost_nanos /
1000000
<< ", write_data_cost_ms=" << ctx->write_data_cost_nanos /
1000000;
+ cb(ctx);
};
if (ctx->put_result.__isset.params) {
diff --git a/be/src/runtime/stream_load/stream_load_executor.h
b/be/src/runtime/stream_load/stream_load_executor.h
index 1364bbbf31b..acdfe00a262 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -17,6 +17,7 @@
#pragma once
+#include <functional>
#include <memory>
#include "common/factory_creator.h"
@@ -51,6 +52,10 @@ public:
Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
+ Status execute_plan_fragment(
+ std::shared_ptr<StreamLoadContext> ctx,
+ const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>&
cb);
+
protected:
// collect the load statistics from context and set them to stat
// return true if stat is set, otherwise, return false
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]