github-actions[bot] commented on code in PR #29277:
URL: https://github.com/apache/doris/pull/29277#discussion_r1437989760


##########
be/src/olap/wal_manager.cpp:
##########
@@ -521,5 +527,80 @@
     }
     return wal_path.string();
 }
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_lock_map.find(wal_id);
+    if (it != _wal_lock_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    _wal_lock_map.emplace(wal_id, lock);
+    _wal_cv_map.emplace(wal_id, cv);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) {
+        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
+    } else {
+        return Status::InternalError("fail to erase wal {} from wal_cv_map", 
wal_id);
+    }
+    return Status::OK();
+}
+Status WalManager::wait_relay_wal_finish(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        LOG(INFO) << "start wait " << wal_id;
+        if (cv->wait_for(l, std::chrono::seconds(180)) == 
std::cv_status::timeout) {
+            LOG(WARNING) << "wait for " << wal_id << " is time out";
+        }
+        LOG(INFO) << "get wal " << wal_id << ",finish wait";
+        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
+        LOG(INFO) << "erase wal " << wal_id;
+    }
+    return Status::OK();
+}
+Status WalManager::notify(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        cv->notify_all();
+        LOG(INFO) << "get wal " << wal_id << ",notify all";
+    }
+    return Status::OK();
+}
+Status WalManager::get_lock_and_cv(int64_t wal_id, 
std::shared_ptr<std::mutex>& lock,

Review Comment:
   warning: method 'get_lock_and_cv' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalManager::get_lock_and_cv(int64_t wal_id, 
std::shared_ptr<std::mutex>& lock,
   ```
   



##########
be/src/olap/wal_manager.cpp:
##########
@@ -521,5 +527,80 @@
     }
     return wal_path.string();
 }
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,

Review Comment:
   warning: method 'add_wal_cv_map' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalManager::add_wal_cv_map(int64_t wal_id, 
std::shared_ptr<std::mutex> lock,
   ```
   



##########
be/src/olap/wal_table.cpp:
##########
@@ -236,171 +244,108 @@
     return Status::OK();
 }
 
-void http_request_done(struct evhttp_request* req, void* arg) {
-    std::stringstream out;
-    std::string status;
-    std::string msg;
-    std::string wal_id;
-    size_t len = 0;
-    if (req != nullptr) {
-        auto input = evhttp_request_get_input_buffer(req);
-        char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-        while (request_line != nullptr) {
-            std::string s(request_line);
-            out << request_line;
-            request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-        }
-        auto out_str = out.str();
-        LOG(INFO) << "replay wal out_str:" << out_str;
-        rapidjson::Document doc;
-        if (!out_str.empty()) {
-            doc.Parse(out.str().c_str());
-            status = std::string(doc["Status"].GetString());
-            msg = std::string(doc["Message"].GetString());
-            LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
-            if (status.find("Fail") != status.npos) {
-                if (msg.find("Label") != msg.npos &&
-                    msg.find("has already been used") != msg.npos) {
-                    retry = false;
-                } else {
-                    retry = true;
-                }
-            } else {
-                retry = false;
-            }
-        } else {
-            retry = true;
-        }
-    } else {
-        LOG(WARNING) << "req is null";
-    }
-
-    if (arg != nullptr) {
-        event_base_loopbreak((struct event_base*)arg);
-    } else {
-        LOG(WARNING) << "arg is null";
-    }
-}
-
-Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const 
std::string& label) {
-#ifndef BE_TEST
-    struct event_base* base = nullptr;
-    struct evhttp_connection* conn = nullptr;
-    struct evhttp_request* req = nullptr;
-    retry = false;
-    event_init();
-    base = event_base_new();
-    conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
-    evhttp_connection_set_base(conn, base);
-    req = evhttp_request_new(http_request_done, base);
-    evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), 
label.c_str());
-    evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), 
std::to_string(wal_id).c_str());
-    evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), 
std::to_string(wal_id).c_str());
+Status WalTable::_construct_sql_str(const std::string& wal, const std::string& 
label,
+                                    std::string& sql_str, std::vector<size_t>& 
index_vector) {
     std::string columns;
     RETURN_IF_ERROR(_read_wal_header(wal, columns));
     std::vector<std::string> column_id_element;
     doris::vectorized::WalReader::string_split(columns, ",", 
column_id_element);
-    std::vector<size_t> index_vector;
     std::stringstream ss_name;
     std::stringstream ss_id;
     int index_raw = 0;
     for (auto column_id_str : column_id_element) {
         try {
             int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
-            auto it = _column_id_name_map.find(column_id);
-            auto it2 = _column_id_index_map.find(column_id);
-            if (it != _column_id_name_map.end() && it2 != 
_column_id_index_map.end()) {
-                ss_name << "`" << it->second << "`,";
-                ss_id << "c" << 
std::to_string(_column_id_index_map[column_id]) << ",";
+            auto it = _column_id_info_map.find(column_id);
+            if (it != _column_id_info_map.end()) {
+                ss_name << "`" << it->second->first << "`,";
+                ss_id << "c" << std::to_string(it->second->second) << ",";
                 index_vector.emplace_back(index_raw);
-                _column_id_name_map.erase(column_id);
-                _column_id_index_map.erase(column_id);
             }
             index_raw++;
         } catch (const std::invalid_argument& e) {
             return Status::InvalidArgument("Invalid format, {}", e.what());
         }
     }
-    _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
     auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
     auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
     std::stringstream ss;
     ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL 
" << label << " ("
        << name << ") select " << id << " from http_stream(\"format\" = 
\"wal\", \"table_id\" = \""
        << std::to_string(_table_id) << "\")";
-    evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str());
-    evbuffer* output = evhttp_request_get_output_buffer(req);
-    evbuffer_add_printf(output, "replay wal %s", 
std::to_string(wal_id).c_str());
-
-    evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream");
-    evhttp_connection_set_timeout(req->evcon, 300);
-
-    event_base_dispatch(base);
-    evhttp_connection_free(conn);
-    event_base_free(base);
+    sql_str = ss.str().data();
+    return Status::OK();
+}
+Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
+                                     const std::string& label) {
+    std::string sql_str;
+    std::vector<size_t> index_vector;
+    RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector));
+    _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
+    std::shared_ptr<StreamLoadContext> ctx = 
std::make_shared<StreamLoadContext>(_exec_env);
+    ctx->sql_str = sql_str;
+    ctx->wal_id = wal_id;
+    ctx->auth.auth_code = wal_id;
+    ctx->label = label;
+    ctx->group_commit = false;
+    auto st = _http_stream_action->process_put(nullptr, ctx);
+    if (st.ok()) {
+        // wait stream load finish
+        RETURN_IF_ERROR(ctx->future.get());
+        if (ctx->status.ok()) {
+            auto commit_st = 
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+            st = commit_st;
+        } else if (!ctx->status.ok()) {
+            LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+                         << ", errmsg=" << ctx->status;
+            _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+            st = ctx->status;
+        }
+    }
+    if (config::wait_relay_wal_finish) {
+        RETURN_IF_ERROR(_exec_env->wal_mgr()->notify(wal_id));
+    }
+    _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
+    LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string();
+    return st;
+}
 
+Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const 
std::string& wal,

Review Comment:
   warning: method '_replay_one_txn_with_stremaload' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, 
const std::string& wal,
   ```
   



##########
be/src/olap/wal_table.cpp:
##########
@@ -236,171 +244,108 @@
     return Status::OK();
 }
 
-void http_request_done(struct evhttp_request* req, void* arg) {
-    std::stringstream out;
-    std::string status;
-    std::string msg;
-    std::string wal_id;
-    size_t len = 0;
-    if (req != nullptr) {
-        auto input = evhttp_request_get_input_buffer(req);
-        char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-        while (request_line != nullptr) {
-            std::string s(request_line);
-            out << request_line;
-            request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
-        }
-        auto out_str = out.str();
-        LOG(INFO) << "replay wal out_str:" << out_str;
-        rapidjson::Document doc;
-        if (!out_str.empty()) {
-            doc.Parse(out.str().c_str());
-            status = std::string(doc["Status"].GetString());
-            msg = std::string(doc["Message"].GetString());
-            LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
-            if (status.find("Fail") != status.npos) {
-                if (msg.find("Label") != msg.npos &&
-                    msg.find("has already been used") != msg.npos) {
-                    retry = false;
-                } else {
-                    retry = true;
-                }
-            } else {
-                retry = false;
-            }
-        } else {
-            retry = true;
-        }
-    } else {
-        LOG(WARNING) << "req is null";
-    }
-
-    if (arg != nullptr) {
-        event_base_loopbreak((struct event_base*)arg);
-    } else {
-        LOG(WARNING) << "arg is null";
-    }
-}
-
-Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const 
std::string& label) {
-#ifndef BE_TEST
-    struct event_base* base = nullptr;
-    struct evhttp_connection* conn = nullptr;
-    struct evhttp_request* req = nullptr;
-    retry = false;
-    event_init();
-    base = event_base_new();
-    conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
-    evhttp_connection_set_base(conn, base);
-    req = evhttp_request_new(http_request_done, base);
-    evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), 
label.c_str());
-    evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), 
std::to_string(wal_id).c_str());
-    evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), 
std::to_string(wal_id).c_str());
+Status WalTable::_construct_sql_str(const std::string& wal, const std::string& 
label,

Review Comment:
   warning: method '_construct_sql_str' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalTable::_construct_sql_str(const std::string& wal, const 
std::string& label,
   ```
   



##########
be/src/olap/wal_manager.cpp:
##########
@@ -521,5 +527,80 @@
     }
     return wal_path.string();
 }
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_lock_map.find(wal_id);
+    if (it != _wal_lock_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    _wal_lock_map.emplace(wal_id, lock);
+    _wal_cv_map.emplace(wal_id, cv);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) {
+        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
+    } else {
+        return Status::InternalError("fail to erase wal {} from wal_cv_map", 
wal_id);
+    }
+    return Status::OK();
+}
+Status WalManager::wait_relay_wal_finish(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        LOG(INFO) << "start wait " << wal_id;
+        if (cv->wait_for(l, std::chrono::seconds(180)) == 
std::cv_status::timeout) {
+            LOG(WARNING) << "wait for " << wal_id << " is time out";
+        }
+        LOG(INFO) << "get wal " << wal_id << ",finish wait";
+        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
+        LOG(INFO) << "erase wal " << wal_id;
+    }
+    return Status::OK();
+}
+Status WalManager::notify(int64_t wal_id) {

Review Comment:
   warning: method 'notify' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal_manager.h:89:
   ```diff
   -     Status notify(int64_t wal_id);
   +     static Status notify(int64_t wal_id);
   ```
   



##########
be/src/olap/wal_manager.cpp:
##########
@@ -521,5 +527,80 @@
     }
     return wal_path.string();
 }
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_lock_map.find(wal_id);
+    if (it != _wal_lock_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    _wal_lock_map.emplace(wal_id, lock);
+    _wal_cv_map.emplace(wal_id, cv);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {

Review Comment:
   warning: method 'erase_wal_cv_map' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal_manager.h:84:
   ```diff
   -     Status erase_wal_cv_map(int64_t wal_id);
   +     static Status erase_wal_cv_map(int64_t wal_id);
   ```
   



##########
be/src/olap/wal_manager.cpp:
##########
@@ -521,5 +527,80 @@
     }
     return wal_path.string();
 }
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_lock_map.find(wal_id);
+    if (it != _wal_lock_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    _wal_lock_map.emplace(wal_id, lock);
+    _wal_cv_map.emplace(wal_id, cv);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) {
+        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
+    } else {
+        return Status::InternalError("fail to erase wal {} from wal_cv_map", 
wal_id);
+    }
+    return Status::OK();
+}
+Status WalManager::wait_relay_wal_finish(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        LOG(INFO) << "start wait " << wal_id;
+        if (cv->wait_for(l, std::chrono::seconds(180)) == 
std::cv_status::timeout) {
+            LOG(WARNING) << "wait for " << wal_id << " is time out";
+        }
+        LOG(INFO) << "get wal " << wal_id << ",finish wait";
+        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
+        LOG(INFO) << "erase wal " << wal_id;
+    }
+    return Status::OK();
+}
+Status WalManager::notify(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        cv->notify_all();
+        LOG(INFO) << "get wal " << wal_id << ",notify all";
+    }
+    return Status::OK();
+}
+Status WalManager::get_lock_and_cv(int64_t wal_id, 
std::shared_ptr<std::mutex>& lock,
+                                   std::shared_ptr<std::condition_variable>& 
cv) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto lock_it = _wal_lock_map.find(wal_id);
+    if (lock_it == _wal_lock_map.end()) {
+        return Status::InternalError("cannot find txn {} in wal_lock_map", 
wal_id);
+    }
+    lock = lock_it->second;
+    auto cv_it = _wal_cv_map.find(wal_id);
+    if (cv_it == _wal_cv_map.end()) {
+        return Status::InternalError("cannot find txn {} in wal_cv_map", 
wal_id);
+    }
+    cv = cv_it->second;
+    return Status::OK();
+}
+
+bool WalManager::find_wal_path(int64_t wal_id) {
+    std::shared_lock rdlock(_wal_lock);
+    auto it = _wal_path_map.find(wal_id);
+    if (it != _wal_path_map.end()) {
+        LOG(INFO) << "find wal path " << it->second;
+        return true;

Review Comment:
   warning: redundant boolean literal in conditional return statement 
[readability-simplify-boolean-expr]
   
   be/src/olap/wal_manager.cpp:596:
   ```diff
   -     if (it != _wal_path_map.end()) {
   -         LOG(INFO) << "find wal path " << it->second;
   -         return true;
   -     } else {
   -         LOG(INFO) << "not find wal path for " << wal_id;
   -         return false;
   -     }
   +     return it != _wal_path_map.end();
   ```
   



##########
be/src/olap/wal_manager.cpp:
##########
@@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() {
             &_update_wal_dirs_info_thread);
 }
 
-void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, 
WAL_STATUS wal_status) {
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, 
WalStatus wal_status) {

Review Comment:
   warning: method 'add_wal_status_queue' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal_manager.h:73:
   ```diff
   -     void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus 
wal_status);
   +     static void add_wal_status_queue(int64_t table_id, int64_t wal_id, 
WalStatus wal_status);
   ```
   



##########
be/src/olap/wal_table.cpp:
##########
@@ -41,125 +38,158 @@
 namespace doris {
 
 WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
-        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {}
+        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {
+    _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
+}
 WalTable::~WalTable() {}
 
 #ifdef BE_TEST
-std::string k_request_line;
+Status k_stream_load_exec_status;
 #endif
 
-bool retry = false;
-
-void WalTable::add_wals(std::vector<std::string> wals) {
+void WalTable::add_wal(int64_t wal_id, std::string wal) {

Review Comment:
   warning: method 'add_wal' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void WalTable::add_wal(int64_t wal_id, std::string wal) {
   ```
   



##########
be/src/olap/wal_table.cpp:
##########
@@ -41,125 +38,158 @@
 namespace doris {
 
 WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
-        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {}
+        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {
+    _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
+}
 WalTable::~WalTable() {}
 
 #ifdef BE_TEST
-std::string k_request_line;
+Status k_stream_load_exec_status;
 #endif
 
-bool retry = false;
-
-void WalTable::add_wals(std::vector<std::string> wals) {
+void WalTable::add_wal(int64_t wal_id, std::string wal) {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
-    for (const auto& wal : wals) {
-        LOG(INFO) << "add replay wal " << wal;
-        _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
-    }
+    LOG(INFO) << "add replay wal " << wal;
+    auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
+    _replay_wal_map.emplace(wal, wal_info);
 }
-Status WalTable::replay_wals() {
+void WalTable::pick_relay_wals() {
+    std::lock_guard<std::mutex> lock(_replay_wal_lock);
     std::vector<std::string> need_replay_wals;
     std::vector<std::string> need_erase_wals;
-    {
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        if (_replay_wal_map.empty()) {
-            return Status::OK();
-        }
-        VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << 
_table_id
-                   << ", wal size=" << _replay_wal_map.size();
-        for (auto& [wal, info] : _replay_wal_map) {
-            auto& [retry_num, start_ts, replaying] = info;
-            if (replaying) {
-                LOG(INFO) << wal << " is replaying, skip this round";
-                return Status::OK();
-            }
-            if (retry_num >= config::group_commit_replay_wal_retry_num) {
-                LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
-                             << ", wal=" << wal
-                             << ", retry_num=" << 
config::group_commit_replay_wal_retry_num;
-                std::string rename_path = _get_tmp_path(wal);
-                LOG(INFO) << "rename wal from " << wal << " to " << 
rename_path;
-                std::rename(wal.c_str(), rename_path.c_str());
-                need_erase_wals.push_back(wal);
-                continue;
+    for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) 
{
+        auto wal_info = it->second;
+        if (wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
+            LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
+                         << ", wal=" << it->first << ", retry_num=" << 
wal_info->get_retry_num();
+            auto st = _rename_to_tmp_path(it->first);
+            if (!st.ok()) {
+                LOG(WARNING) << "rename " << it->first << " fail"
+                             << ",st:" << st.to_string();
             }
-            if (_need_replay(info)) {
-                need_replay_wals.push_back(wal);
+            need_erase_wals.push_back(it->first);
+            if (config::wait_relay_wal_finish) {
+                auto notify_st = 
_exec_env->wal_mgr()->notify(it->second->get_wal_id());
+                if (!notify_st.ok()) {
+                    LOG(WARNING) << "notify wal " << it->second->get_wal_id() 
<< " fail";
+                }
             }
+            continue;
         }
-        std::sort(need_replay_wals.begin(), need_replay_wals.end());
-        for (const auto& wal : need_erase_wals) {
-            if (_replay_wal_map.erase(wal)) {
-                LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
-            } else {
-                LOG(WARNING) << "fail to erase wal " << wal << " from 
_replay_wal_map";
-            }
+        if (_need_replay(wal_info)) {
+            need_replay_wals.push_back(it->first);
         }
     }
+    for (const auto& wal : need_erase_wals) {
+        _replay_wal_map.erase(wal);
+    }
+    std::sort(need_replay_wals.begin(), need_replay_wals.end());
     for (const auto& wal : need_replay_wals) {
+        _replaying_queue.emplace_back(_replay_wal_map[wal]);
+        _replay_wal_map.erase(wal);
+    }
+}
+
+Status WalTable::relay_wal_one_by_one() {
+    std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
+    std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
+    while (!_replaying_queue.empty()) {
+        std::shared_ptr<WalInfo> wal_info = nullptr;
         {
             std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            if (_stop.load()) {
-                break;
+            wal_info = _replaying_queue.front();
+            _replaying_queue.pop_front();
+        }
+        wal_info->add_retry_num();
+        auto st = _replay_wal_internal(wal_info->get_wal_path());
+        if (!st.ok()) {
+            LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table=" 
<< _table_id
+                         << ", wal=" << wal_info->get_wal_path() << ", st=" << 
st.to_string();
+            if (!st.is<ErrorCode::NOT_FOUND>()) {
+                need_retry_wals.push_back(wal_info);
             } else {
-                auto it = _replay_wal_map.find(wal);
-                if (it != _replay_wal_map.end()) {
-                    auto& [retry_num, start_time, replaying] = it->second;
-                    replaying = true;
-                }
+                need_delete_wals.push_back(wal_info);
             }
+        } else {
+            need_delete_wals.push_back(wal_info);
+        }
+        VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id
+                    << ", wal=" << wal_info->get_wal_path() << ", st=" << 
st.to_string();
+    }
+    {
+        std::lock_guard<std::mutex> lock(_replay_wal_lock);
+        for (auto retry_wal_info : need_retry_wals) {
+            _replay_wal_map.emplace(retry_wal_info->get_wal_path(), 
retry_wal_info);
         }
-        auto st = _replay_wal_internal(wal);
+    }
+    for (auto delete_wal_info : need_delete_wals) {
+        auto st = _delete_wal(delete_wal_info->get_wal_id());
         if (!st.ok()) {
-            std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            auto it = _replay_wal_map.find(wal);
-            if (it != _replay_wal_map.end()) {
-                auto& [retry_num, start_time, replaying] = it->second;
-                replaying = false;
-            }
-            LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id
-                         << ", table=" << _table_id << ", wal=" << wal << ", 
st=" << st.to_string();
-            break;
+            LOG(WARNING) << "fail to delete wal " << 
delete_wal_info->get_wal_path();
         }
-        VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id 
<< ", label=" << wal
-                    << ", st=" << st.to_string();
     }
     return Status::OK();
 }
-
-std::string WalTable::_get_tmp_path(const std::string wal) {
-    std::vector<std::string> path_element;
-    doris::vectorized::WalReader::string_split(wal, "/", path_element);
-    std::stringstream ss;
-    int index = 0;
-    while (index < path_element.size() - 3) {
-        ss << path_element[index] << "/";
-        index++;
+Status WalTable::replay_wals() {
+    {
+        std::lock_guard<std::mutex> lock(_replay_wal_lock);
+        if (_replay_wal_map.empty()) {
+            LOG(INFO) << "_replay_wal_map is empty,skip relaying";
+            return Status::OK();
+        }
+        if (!_replaying_queue.empty()) {
+            LOG(INFO) << "_replaying_queue is not empty,skip relaying";
+            return Status::OK();
+        }
     }
-    ss << "tmp/";
-    while (index < path_element.size()) {
-        if (index != path_element.size() - 1) {
-            ss << path_element[index] << "_";
-        } else {
-            ss << path_element[index];
+    VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << 
_table_id
+               << ", wal size=" << _replay_wal_map.size();
+    pick_relay_wals();
+    RETURN_IF_ERROR(relay_wal_one_by_one());
+    return Status::OK();
+}
+
+Status WalTable::_rename_to_tmp_path(const std::string wal) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
         }
-        index++;
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_exec_env->wal_mgr()->tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
     }
-    return ss.str();
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    return Status::OK();
 }
 
-bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) {
+bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
+    if (config::wait_relay_wal_finish) {
+        return true;
+    }
 #ifndef BE_TEST
-    auto& [retry_num, start_ts, replaying] = info;
-    auto replay_interval =
-            pow(2, retry_num) * 
config::group_commit_replay_wal_retry_interval_seconds * 1000;
-    return UnixMillis() - start_ts >= replay_interval;
+    auto replay_interval = pow(2, wal_info->get_retry_num()) *
+                           
config::group_commit_replay_wal_retry_interval_seconds * 1000;
+    return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
 #else
     return true;
 #endif
 }
 
-Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) {
+Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {

Review Comment:
   warning: method '_try_abort_txn' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal_table.h:53:
   ```diff
   -     Status _try_abort_txn(int64_t db_id, int64_t wal_id);
   +     static Status _try_abort_txn(int64_t db_id, int64_t wal_id);
   ```
   



##########
be/src/olap/wal_manager.cpp:
##########
@@ -521,5 +527,80 @@
     }
     return wal_path.string();
 }
+Status WalManager::add_wal_cv_map(int64_t wal_id, std::shared_ptr<std::mutex> 
lock,
+                                  std::shared_ptr<std::condition_variable> cv) 
{
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto it = _wal_lock_map.find(wal_id);
+    if (it != _wal_lock_map.end()) {
+        return Status::InternalError("wal {} is already in _wal_cv_map ", 
wal_id);
+    }
+    _wal_lock_map.emplace(wal_id, lock);
+    _wal_cv_map.emplace(wal_id, cv);
+    LOG(INFO) << "add  " << wal_id << " to _wal_cv_map";
+    return Status::OK();
+}
+Status WalManager::erase_wal_cv_map(int64_t wal_id) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    if (_wal_lock_map.erase(wal_id) && _wal_cv_map.erase(wal_id)) {
+        LOG(INFO) << "erase " << wal_id << " from _wal_cv_map";
+    } else {
+        return Status::InternalError("fail to erase wal {} from wal_cv_map", 
wal_id);
+    }
+    return Status::OK();
+}
+Status WalManager::wait_relay_wal_finish(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        LOG(INFO) << "start wait " << wal_id;
+        if (cv->wait_for(l, std::chrono::seconds(180)) == 
std::cv_status::timeout) {
+            LOG(WARNING) << "wait for " << wal_id << " is time out";
+        }
+        LOG(INFO) << "get wal " << wal_id << ",finish wait";
+        RETURN_IF_ERROR(erase_wal_cv_map(wal_id));
+        LOG(INFO) << "erase wal " << wal_id;
+    }
+    return Status::OK();
+}
+Status WalManager::notify(int64_t wal_id) {
+    std::shared_ptr<std::mutex> lock = nullptr;
+    std::shared_ptr<std::condition_variable> cv = nullptr;
+    auto st = get_lock_and_cv(wal_id, lock, cv);
+    if (st.ok()) {
+        std::unique_lock l(*(lock));
+        cv->notify_all();
+        LOG(INFO) << "get wal " << wal_id << ",notify all";
+    }
+    return Status::OK();
+}
+Status WalManager::get_lock_and_cv(int64_t wal_id, 
std::shared_ptr<std::mutex>& lock,
+                                   std::shared_ptr<std::condition_variable>& 
cv) {
+    std::lock_guard<std::shared_mutex> wrlock(_wal_cv_lock);
+    auto lock_it = _wal_lock_map.find(wal_id);
+    if (lock_it == _wal_lock_map.end()) {
+        return Status::InternalError("cannot find txn {} in wal_lock_map", 
wal_id);
+    }
+    lock = lock_it->second;
+    auto cv_it = _wal_cv_map.find(wal_id);
+    if (cv_it == _wal_cv_map.end()) {
+        return Status::InternalError("cannot find txn {} in wal_cv_map", 
wal_id);
+    }
+    cv = cv_it->second;
+    return Status::OK();
+}
+
+bool WalManager::find_wal_path(int64_t wal_id) {

Review Comment:
   warning: method 'find_wal_path' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/olap/wal_manager.h:88:
   ```diff
   -     bool find_wal_path(int64_t wal_id);
   +     static bool find_wal_path(int64_t wal_id);
   ```
   



##########
be/src/olap/wal_table.h:
##########
@@ -32,25 +34,32 @@ class WalTable {
     WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id);
     ~WalTable();
     // used when be start and there are wals need to do recovery
-    void add_wals(std::vector<std::string> wals);
+    void add_wal(int64_t wal_id, std::string wal);
+    void pick_relay_wals();
+    Status relay_wal_one_by_one();
     Status replay_wals();
     size_t size();
     void stop();
 
 public:
-    // <retry_num, start_time_ms, is_doing_replay>
-    using replay_wal_info = std::tuple<int64_t, int64_t, bool>;
+    // <column_name, column_index>
+    using ColumnInfo = std::pair<std::string, int64_t>;
 
 private:
-    Status _get_wal_info(const std::string& wal, 
std::shared_ptr<std::pair<int64_t, std::string>>&);
-    std::string _get_tmp_path(const std::string wal);
-    Status _send_request(int64_t wal_id, const std::string& wal, const 
std::string& label);
-    Status _abort_txn(int64_t db_id, int64_t wal_id);
+    Status _parse_wal_path(const std::string& wal,
+                           std::shared_ptr<std::pair<int64_t, std::string>>&);
+    Status _rename_to_tmp_path(const std::string wal);

Review Comment:
   warning: parameter 'wal' is const-qualified in the function declaration; 
const-qualification of parameters only has an effect in function definitions 
[readability-avoid-const-params-in-decls]
   
   ```suggestion
       Status _rename_to_tmp_path(std::string wal);
   ```
   



##########
be/src/olap/wal_table.cpp:
##########
@@ -41,125 +38,158 @@
 namespace doris {
 
 WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
-        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {}
+        : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), 
_stop(false) {
+    _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
+}
 WalTable::~WalTable() {}
 
 #ifdef BE_TEST
-std::string k_request_line;
+Status k_stream_load_exec_status;
 #endif
 
-bool retry = false;
-
-void WalTable::add_wals(std::vector<std::string> wals) {
+void WalTable::add_wal(int64_t wal_id, std::string wal) {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
-    for (const auto& wal : wals) {
-        LOG(INFO) << "add replay wal " << wal;
-        _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
-    }
+    LOG(INFO) << "add replay wal " << wal;
+    auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
+    _replay_wal_map.emplace(wal, wal_info);
 }
-Status WalTable::replay_wals() {
+void WalTable::pick_relay_wals() {
+    std::lock_guard<std::mutex> lock(_replay_wal_lock);
     std::vector<std::string> need_replay_wals;
     std::vector<std::string> need_erase_wals;
-    {
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        if (_replay_wal_map.empty()) {
-            return Status::OK();
-        }
-        VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << 
_table_id
-                   << ", wal size=" << _replay_wal_map.size();
-        for (auto& [wal, info] : _replay_wal_map) {
-            auto& [retry_num, start_ts, replaying] = info;
-            if (replaying) {
-                LOG(INFO) << wal << " is replaying, skip this round";
-                return Status::OK();
-            }
-            if (retry_num >= config::group_commit_replay_wal_retry_num) {
-                LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
-                             << ", wal=" << wal
-                             << ", retry_num=" << 
config::group_commit_replay_wal_retry_num;
-                std::string rename_path = _get_tmp_path(wal);
-                LOG(INFO) << "rename wal from " << wal << " to " << 
rename_path;
-                std::rename(wal.c_str(), rename_path.c_str());
-                need_erase_wals.push_back(wal);
-                continue;
+    for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) 
{
+        auto wal_info = it->second;
+        if (wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
+            LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
+                         << ", wal=" << it->first << ", retry_num=" << 
wal_info->get_retry_num();
+            auto st = _rename_to_tmp_path(it->first);
+            if (!st.ok()) {
+                LOG(WARNING) << "rename " << it->first << " fail"
+                             << ",st:" << st.to_string();
             }
-            if (_need_replay(info)) {
-                need_replay_wals.push_back(wal);
+            need_erase_wals.push_back(it->first);
+            if (config::wait_relay_wal_finish) {
+                auto notify_st = 
_exec_env->wal_mgr()->notify(it->second->get_wal_id());
+                if (!notify_st.ok()) {
+                    LOG(WARNING) << "notify wal " << it->second->get_wal_id() 
<< " fail";
+                }
             }
+            continue;
         }
-        std::sort(need_replay_wals.begin(), need_replay_wals.end());
-        for (const auto& wal : need_erase_wals) {
-            if (_replay_wal_map.erase(wal)) {
-                LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
-            } else {
-                LOG(WARNING) << "fail to erase wal " << wal << " from 
_replay_wal_map";
-            }
+        if (_need_replay(wal_info)) {
+            need_replay_wals.push_back(it->first);
         }
     }
+    for (const auto& wal : need_erase_wals) {
+        _replay_wal_map.erase(wal);
+    }
+    std::sort(need_replay_wals.begin(), need_replay_wals.end());
     for (const auto& wal : need_replay_wals) {
+        _replaying_queue.emplace_back(_replay_wal_map[wal]);
+        _replay_wal_map.erase(wal);
+    }
+}
+
+Status WalTable::relay_wal_one_by_one() {
+    std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
+    std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
+    while (!_replaying_queue.empty()) {
+        std::shared_ptr<WalInfo> wal_info = nullptr;
         {
             std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            if (_stop.load()) {
-                break;
+            wal_info = _replaying_queue.front();
+            _replaying_queue.pop_front();
+        }
+        wal_info->add_retry_num();
+        auto st = _replay_wal_internal(wal_info->get_wal_path());
+        if (!st.ok()) {
+            LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table=" 
<< _table_id
+                         << ", wal=" << wal_info->get_wal_path() << ", st=" << 
st.to_string();
+            if (!st.is<ErrorCode::NOT_FOUND>()) {
+                need_retry_wals.push_back(wal_info);
             } else {
-                auto it = _replay_wal_map.find(wal);
-                if (it != _replay_wal_map.end()) {
-                    auto& [retry_num, start_time, replaying] = it->second;
-                    replaying = true;
-                }
+                need_delete_wals.push_back(wal_info);
             }
+        } else {
+            need_delete_wals.push_back(wal_info);
+        }
+        VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id
+                    << ", wal=" << wal_info->get_wal_path() << ", st=" << 
st.to_string();
+    }
+    {
+        std::lock_guard<std::mutex> lock(_replay_wal_lock);
+        for (auto retry_wal_info : need_retry_wals) {
+            _replay_wal_map.emplace(retry_wal_info->get_wal_path(), 
retry_wal_info);
         }
-        auto st = _replay_wal_internal(wal);
+    }
+    for (auto delete_wal_info : need_delete_wals) {
+        auto st = _delete_wal(delete_wal_info->get_wal_id());
         if (!st.ok()) {
-            std::lock_guard<std::mutex> lock(_replay_wal_lock);
-            auto it = _replay_wal_map.find(wal);
-            if (it != _replay_wal_map.end()) {
-                auto& [retry_num, start_time, replaying] = it->second;
-                replaying = false;
-            }
-            LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id
-                         << ", table=" << _table_id << ", wal=" << wal << ", 
st=" << st.to_string();
-            break;
+            LOG(WARNING) << "fail to delete wal " << 
delete_wal_info->get_wal_path();
         }
-        VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id 
<< ", label=" << wal
-                    << ", st=" << st.to_string();
     }
     return Status::OK();
 }
-
-std::string WalTable::_get_tmp_path(const std::string wal) {
-    std::vector<std::string> path_element;
-    doris::vectorized::WalReader::string_split(wal, "/", path_element);
-    std::stringstream ss;
-    int index = 0;
-    while (index < path_element.size() - 3) {
-        ss << path_element[index] << "/";
-        index++;
+Status WalTable::replay_wals() {
+    {
+        std::lock_guard<std::mutex> lock(_replay_wal_lock);
+        if (_replay_wal_map.empty()) {
+            LOG(INFO) << "_replay_wal_map is empty,skip relaying";
+            return Status::OK();
+        }
+        if (!_replaying_queue.empty()) {
+            LOG(INFO) << "_replaying_queue is not empty,skip relaying";
+            return Status::OK();
+        }
     }
-    ss << "tmp/";
-    while (index < path_element.size()) {
-        if (index != path_element.size() - 1) {
-            ss << path_element[index] << "_";
-        } else {
-            ss << path_element[index];
+    VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << 
_table_id
+               << ", wal size=" << _replay_wal_map.size();
+    pick_relay_wals();
+    RETURN_IF_ERROR(relay_wal_one_by_one());
+    return Status::OK();
+}
+
+Status WalTable::_rename_to_tmp_path(const std::string wal) {
+    io::Path wal_path = wal;
+    std::list<std::string> path_element;
+    for (int i = 0; i < 3; ++i) {
+        if (!wal_path.has_parent_path()) {
+            return Status::InternalError("parent path is not enough when 
rename " + wal);
         }
-        index++;
+        path_element.push_front(wal_path.filename().string());
+        wal_path = wal_path.parent_path();
+    }
+    wal_path.append(_exec_env->wal_mgr()->tmp);
+    for (auto path : path_element) {
+        wal_path.append(path);
     }
-    return ss.str();
+    bool exists = false;
+    
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), 
&exists));
+    if (!exists) {
+        
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+    }
+    auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+    if (res < 0) {
+        return Status::InternalError("rename fail on path " + wal);
+    }
+    LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+    return Status::OK();
 }
 
-bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) {
+bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {

Review Comment:
   warning: method '_need_replay' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
   ```
   



##########
be/src/olap/wal_table.cpp:
##########
@@ -181,47 +211,25 @@
 
 Status WalTable::_replay_wal_internal(const std::string& wal) {
     LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << 
_table_id << ", wal=" << wal;
-    // start a new stream load
-    {
-        std::lock_guard<std::mutex> lock(_replay_wal_lock);
-        auto it = _replay_wal_map.find(wal);
-        if (it != _replay_wal_map.end()) {
-            auto& [retry_num, start_time, replaying] = it->second;
-            ++retry_num;
-            replaying = true;
-        } else {
-            LOG(WARNING) << "can not find wal in stream load replay map. db=" 
<< _db_id
-                         << ", table=" << _table_id << ", wal=" << wal;
-            return Status::OK();
-        }
-    }
     std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr;
-    RETURN_IF_ERROR(_get_wal_info(wal, pair));
+    RETURN_IF_ERROR(_parse_wal_path(wal, pair));
     auto wal_id = pair->first;
     auto label = pair->second;
 #ifndef BE_TEST
-    auto st = _abort_txn(_db_id, wal_id);
-    if (!st.ok()) {
-        LOG(WARNING) << "abort txn " << wal_id << " fail";
-    }
-    auto get_st = _get_column_info(_db_id, _table_id);
-    if (!get_st.ok()) {
-        if (get_st.is<ErrorCode::NOT_FOUND>()) {
-            {
-                std::lock_guard<std::mutex> lock(_replay_wal_lock);
-                _replay_wal_map.erase(wal);
-            }
-            RETURN_IF_ERROR(_delete_wal(wal_id));
+    if (!config::wait_relay_wal_finish) {
+        auto st = _try_abort_txn(_db_id, wal_id);
+        if (!st.ok()) {
+            LOG(WARNING) << "abort txn " << wal_id << " fail";
         }
-        return get_st;
     }
+    RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
 #endif
-    RETURN_IF_ERROR(_send_request(wal_id, wal, label));
+    RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label));
     return Status::OK();
 }
 
-Status WalTable::_get_wal_info(const std::string& wal,
-                               std::shared_ptr<std::pair<int64_t, 
std::string>>& pair) {
+Status WalTable::_parse_wal_path(const std::string& wal,

Review Comment:
   warning: method '_parse_wal_path' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status WalTable::_parse_wal_path(const std::string& wal,
   ```
   



##########
be/src/runtime/runtime_state.h:
##########
@@ -257,6 +257,10 @@ class RuntimeState {
 
     int64_t wal_id() { return _wal_id; }
 
+    void set_txn_id(int64_t txn_id) { _txn_id = txn_id; }
+
+    int64_t txn_id() { return _txn_id; }

Review Comment:
   warning: method 'txn_id' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
       int64_t txn_id() const { return _txn_id; }
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to