This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ea39daa2d32 [Fix](stream-load) Fix stream load stuck under high 
concurrency (#36772)
ea39daa2d32 is described below

commit ea39daa2d32ba9466a969cf7d39090491bfbc62c
Author: Xin Liao <[email protected]>
AuthorDate: Tue Jun 25 13:39:45 2024 +0800

    [Fix](stream-load) Fix stream load stuck under high concurrency (#36772)
    
    ## Proposed changes
    
    When the concurrency of streamload exceeds the number of threads in the
    remote scanner, streamload may get stuck. The reason is that the
    libevent thread blocks and waits for streamload to complete, and when
    there is no intersection between the tasks handled by the scanner thread
    and the libevent thread, it gets stuck.
    The solution is to convert the synchronous waiting tasks of libevent
    into asynchronous execution by using callbacks in the streamload
    executor thread.
---
 be/src/http/action/stream_load.cpp                 | 93 +++++++++++++---------
 be/src/http/action/stream_load.h                   |  4 +-
 .../runtime/stream_load/stream_load_executor.cpp   | 11 ++-
 be/src/runtime/stream_load/stream_load_executor.h  |  5 ++
 4 files changed, 72 insertions(+), 41 deletions(-)

diff --git a/be/src/http/action/stream_load.cpp 
b/be/src/http/action/stream_load.cpp
index eb8d364cf59..0f14270da02 100644
--- a/be/src/http/action/stream_load.cpp
+++ b/be/src/http/action/stream_load.cpp
@@ -33,6 +33,7 @@
 #include <time.h>
 
 #include <algorithm>
+#include <functional>
 #include <future>
 #include <map>
 #include <sstream>
@@ -108,20 +109,67 @@ void StreamLoadAction::handle(HttpRequest* req) {
 
     // status already set to fail
     if (ctx->status.ok()) {
-        ctx->status = _handle(ctx);
+        ctx->status = _handle(ctx, req);
         if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
             LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
                          << ", errmsg=" << ctx->status;
+            _send_reply(ctx, req);
         }
     }
+}
+
+Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
+    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
+        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
+                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
+        return Status::InternalError("receive body don't equal with body 
bytes");
+    }
+
+    // if we use non-streaming, MessageBodyFileSink.finish will close the file
+    RETURN_IF_ERROR(ctx->body_sink->finish());
+    if (!ctx->use_streaming) {
+        // we need to close file first, then execute_plan_fragment here
+        ctx->body_sink.reset();
+        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(
+                ctx,
+                [req, this](std::shared_ptr<StreamLoadContext> ctx) { 
_on_finish(ctx, req); }));
+    }
+
+    return Status::OK();
+}
+
+void StreamLoadAction::_on_finish(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
+    ctx->status = ctx->future.get();
+    if (ctx->status.ok()) {
+        if (ctx->group_commit) {
+            LOG(INFO) << "skip commit because this is group commit, pipe_id="
+                      << ctx->id.to_string();
+        } else if (ctx->two_phase_commit) {
+            int64_t pre_commit_start_time = MonotonicNanos();
+            ctx->status = 
_exec_env->stream_load_executor()->pre_commit_txn(ctx.get());
+            ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
+        } else {
+            // If put file success we need commit this load
+            int64_t commit_and_publish_start_time = MonotonicNanos();
+            ctx->status = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+            ctx->commit_and_publish_txn_cost_nanos =
+                    MonotonicNanos() - commit_and_publish_start_time;
+        }
+    }
+    _send_reply(ctx, req);
+}
+
+void StreamLoadAction::_send_reply(std::shared_ptr<StreamLoadContext> ctx, 
HttpRequest* req) {
     ctx->load_cost_millis = UnixMillis() - ctx->start_millis;
 
     if (!ctx->status.ok() && !ctx->status.is<PUBLISH_TIMEOUT>()) {
+        LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                     << ", errmsg=" << ctx->status;
         if (ctx->need_rollback) {
             _exec_env->stream_load_executor()->rollback_txn(ctx.get());
             ctx->need_rollback = false;
         }
-        if (ctx->body_sink.get() != nullptr) {
+        if (ctx->body_sink != nullptr) {
             ctx->body_sink->cancel(ctx->status.to_string());
         }
     }
@@ -141,42 +189,6 @@ void StreamLoadAction::handle(HttpRequest* req) {
     streaming_load_duration_ms->increment(ctx->load_cost_millis);
 }
 
-Status StreamLoadAction::_handle(std::shared_ptr<StreamLoadContext> ctx) {
-    if (ctx->body_bytes > 0 && ctx->receive_bytes != ctx->body_bytes) {
-        LOG(WARNING) << "recevie body don't equal with body bytes, 
body_bytes=" << ctx->body_bytes
-                     << ", receive_bytes=" << ctx->receive_bytes << ", id=" << 
ctx->id;
-        return Status::InternalError("receive body don't equal with body 
bytes");
-    }
-
-    // if we use non-streaming, MessageBodyFileSink.finish will close the file
-    RETURN_IF_ERROR(ctx->body_sink->finish());
-    if (!ctx->use_streaming) {
-        // we need to close file first, then execute_plan_fragment here
-        ctx->body_sink.reset();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(ctx));
-    }
-
-    // wait stream load finish
-    RETURN_IF_ERROR(ctx->future.get());
-
-    if (ctx->group_commit) {
-        LOG(INFO) << "skip commit because this is group commit, pipe_id=" << 
ctx->id.to_string();
-        return Status::OK();
-    }
-
-    if (ctx->two_phase_commit) {
-        int64_t pre_commit_start_time = MonotonicNanos();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->pre_commit_txn(ctx.get()));
-        ctx->pre_commit_txn_cost_nanos = MonotonicNanos() - 
pre_commit_start_time;
-    } else {
-        // If put file success we need commit this load
-        int64_t commit_and_publish_start_time = MonotonicNanos();
-        
RETURN_IF_ERROR(_exec_env->stream_load_executor()->commit_txn(ctx.get()));
-        ctx->commit_and_publish_txn_cost_nanos = MonotonicNanos() - 
commit_and_publish_start_time;
-    }
-    return Status::OK();
-}
-
 int StreamLoadAction::on_header(HttpRequest* req) {
     streaming_load_current_processing->increment(1);
 
@@ -681,7 +693,10 @@ Status StreamLoadAction::_process_put(HttpRequest* 
http_req,
         return Status::OK();
     }
 
-    return _exec_env->stream_load_executor()->execute_plan_fragment(ctx);
+    return _exec_env->stream_load_executor()->execute_plan_fragment(
+            ctx, [http_req, this](std::shared_ptr<StreamLoadContext> ctx) {
+                _on_finish(ctx, http_req);
+            });
 }
 
 Status StreamLoadAction::_data_saved_path(HttpRequest* req, std::string* 
file_path) {
diff --git a/be/src/http/action/stream_load.h b/be/src/http/action/stream_load.h
index d1de89c9397..f91334e7305 100644
--- a/be/src/http/action/stream_load.h
+++ b/be/src/http/action/stream_load.h
@@ -46,11 +46,13 @@ public:
 
 private:
     Status _on_header(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
-    Status _handle(std::shared_ptr<StreamLoadContext> ctx);
+    Status _handle(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
     Status _data_saved_path(HttpRequest* req, std::string* file_path);
     Status _process_put(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
     void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx, 
const std::string& str);
     Status _handle_group_commit(HttpRequest* http_req, 
std::shared_ptr<StreamLoadContext> ctx);
+    void _on_finish(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
+    void _send_reply(std::shared_ptr<StreamLoadContext> ctx, HttpRequest* req);
 
 private:
     ExecEnv* _exec_env;
diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp 
b/be/src/runtime/stream_load/stream_load_executor.cpp
index d26beb66827..62be00f1a7b 100644
--- a/be/src/runtime/stream_load/stream_load_executor.cpp
+++ b/be/src/runtime/stream_load/stream_load_executor.cpp
@@ -27,8 +27,10 @@
 #include <glog/logging.h>
 #include <stdint.h>
 
+#include <functional>
 #include <future>
 #include <map>
+#include <memory>
 #include <ostream>
 #include <string>
 #include <utility>
@@ -66,13 +68,19 @@ bvar::LatencyRecorder 
g_stream_load_precommit_txn_latency("stream_load", "precom
 bvar::LatencyRecorder g_stream_load_commit_txn_latency("stream_load", 
"commit_txn");
 
 Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadContext> 
ctx) {
+    return execute_plan_fragment(ctx, [](std::shared_ptr<StreamLoadContext> 
ctx) {});
+}
+
+Status StreamLoadExecutor::execute_plan_fragment(
+        std::shared_ptr<StreamLoadContext> ctx,
+        const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& cb) 
{
 // submit this params
 #ifndef BE_TEST
     ctx->start_write_data_nanos = MonotonicNanos();
     LOG(INFO) << "begin to execute stream load. label=" << ctx->label << ", 
txn_id=" << ctx->txn_id
               << ", query_id=" << ctx->id;
     Status st;
-    auto exec_fragment = [ctx, this](RuntimeState* state, Status* status) {
+    auto exec_fragment = [ctx, cb, this](RuntimeState* state, Status* status) {
         if (ctx->group_commit) {
             ctx->label = state->import_label();
             ctx->txn_id = state->wal_id();
@@ -142,6 +150,7 @@ Status 
StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
                   << (ctx->receive_and_read_data_cost_nanos - 
ctx->read_data_cost_nanos) / 1000000
                   << ", read_data_cost_ms=" << ctx->read_data_cost_nanos / 
1000000
                   << ", write_data_cost_ms=" << ctx->write_data_cost_nanos / 
1000000;
+        cb(ctx);
     };
 
     if (ctx->put_result.__isset.params) {
diff --git a/be/src/runtime/stream_load/stream_load_executor.h 
b/be/src/runtime/stream_load/stream_load_executor.h
index 1364bbbf31b..acdfe00a262 100644
--- a/be/src/runtime/stream_load/stream_load_executor.h
+++ b/be/src/runtime/stream_load/stream_load_executor.h
@@ -17,6 +17,7 @@
 
 #pragma once
 
+#include <functional>
 #include <memory>
 
 #include "common/factory_creator.h"
@@ -51,6 +52,10 @@ public:
 
     Status execute_plan_fragment(std::shared_ptr<StreamLoadContext> ctx);
 
+    Status execute_plan_fragment(
+            std::shared_ptr<StreamLoadContext> ctx,
+            const std::function<void(std::shared_ptr<StreamLoadContext> ctx)>& 
cb);
+
 protected:
     // collect the load statistics from context and set them to stat
     // return true if stat is set, otherwise, return false


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to