github-actions[bot] commented on code in PR #29183: URL: https://github.com/apache/doris/pull/29183#discussion_r1437814800
########## be/src/olap/wal_info.cpp: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_info.h" +namespace doris { +WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) + : _wal_id(wal_id), + _wal_path(wal_path), + _retry_num(retry_num), + _start_time_ms(start_time_ms) {} +WalInfo::~WalInfo() {} +int64_t WalInfo::get_wal_id() { Review Comment: warning: method 'get_wal_id' can be made const [readability-make-member-function-const] ```suggestion int64_t WalInfo::get_wal_id() const { ``` be/src/olap/wal_info.h:24: ```diff - int64_t get_wal_id(); + int64_t get_wal_id() const; ``` ########## be/src/olap/wal_table.cpp: ########## @@ -236,171 +229,102 @@ return Status::OK(); } -void http_request_done(struct evhttp_request* req, void* arg) { - std::stringstream out; - std::string status; - std::string msg; - std::string wal_id; - size_t len = 0; - if (req != nullptr) { - auto input = evhttp_request_get_input_buffer(req); - char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - while (request_line != nullptr) { - std::string s(request_line); - out << request_line; - request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - } - auto out_str = out.str(); - LOG(INFO) << "replay wal out_str:" << out_str; - rapidjson::Document doc; - if (!out_str.empty()) { - doc.Parse(out.str().c_str()); - status = std::string(doc["Status"].GetString()); - msg = std::string(doc["Message"].GetString()); - LOG(INFO) << "replay wal status:" << status << ",msg:" << msg; - if (status.find("Fail") != status.npos) { - if (msg.find("Label") != msg.npos && - msg.find("has already been used") != msg.npos) { - retry = false; - } else { - retry = true; - } - } else { - retry = false; - } - } else { - retry = true; - } - } else { - LOG(WARNING) << "req is null"; - } - - if (arg != nullptr) { - event_base_loopbreak((struct event_base*)arg); - } else { - LOG(WARNING) << "arg is null"; - } -} - -Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { -#ifndef BE_TEST - struct event_base* base = nullptr; - struct evhttp_connection* conn = nullptr; - struct evhttp_request* req = nullptr; - retry = false; - event_init(); - base = event_base_new(); - conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port); - evhttp_connection_set_base(conn, base); - req = evhttp_request_new(http_request_done, base); - evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), label.c_str()); - evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str()); - evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str()); +Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, Review Comment: warning: method '_construct_sql_str' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, ``` ########## be/src/olap/wal_info.cpp: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_info.h" +namespace doris { +WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) + : _wal_id(wal_id), + _wal_path(wal_path), + _retry_num(retry_num), + _start_time_ms(start_time_ms) {} +WalInfo::~WalInfo() {} +int64_t WalInfo::get_wal_id() { + return _wal_id; +} +std::string WalInfo::get_wal_path() { + return _wal_path; +} +int64_t WalInfo::get_retry_num() { Review Comment: warning: method 'get_retry_num' can be made const [readability-make-member-function-const] ```suggestion int64_t WalInfo::get_retry_num() const { ``` be/src/olap/wal_info.h:25: ```diff - int64_t get_retry_num(); + int64_t get_retry_num() const; ``` ########## be/src/olap/wal_table.cpp: ########## @@ -41,125 +38,145 @@ namespace doris { WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {} + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) { + _http_stream_action = std::make_shared<HttpStreamAction>(exec_env); +} WalTable::~WalTable() {} -#ifdef BE_TEST -std::string k_request_line; -#endif - -bool retry = false; - -void WalTable::add_wals(std::vector<std::string> wals) { +void WalTable::add_wal(int64_t wal_id, std::string wal) { std::lock_guard<std::mutex> lock(_replay_wal_lock); - for (const auto& wal : wals) { - LOG(INFO) << "add replay wal " << wal; - _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); - } + LOG(INFO) << "add replay wal " << wal; + auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis()); + _replay_wal_map.emplace(wal, wal_info); } -Status WalTable::replay_wals() { +void WalTable::pick_relay_wals() { + std::lock_guard<std::mutex> lock(_replay_wal_lock); std::vector<std::string> need_replay_wals; std::vector<std::string> need_erase_wals; - { - std::lock_guard<std::mutex> lock(_replay_wal_lock); - if (_replay_wal_map.empty()) { - return Status::OK(); - } - VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id - << ", wal size=" << _replay_wal_map.size(); - for (auto& [wal, info] : _replay_wal_map) { - auto& [retry_num, start_ts, replaying] = info; - if (replaying) { - LOG(INFO) << wal << " is replaying, skip this round"; - return Status::OK(); - } - if (retry_num >= config::group_commit_replay_wal_retry_num) { - LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id - << ", wal=" << wal - << ", retry_num=" << config::group_commit_replay_wal_retry_num; - std::string rename_path = _get_tmp_path(wal); - LOG(INFO) << "rename wal from " << wal << " to " << rename_path; - std::rename(wal.c_str(), rename_path.c_str()); - need_erase_wals.push_back(wal); - continue; - } - if (_need_replay(info)) { - need_replay_wals.push_back(wal); + for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) { + auto wal_info = it->second; + if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { + LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id + << ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num(); + auto st = _rename_to_tmp_path(it->first); + if (!st.ok()) { + LOG(WARNING) << "rename " << it->first << " fail" + << ",st:" << st.to_string(); } + need_erase_wals.push_back(it->first); + continue; } - std::sort(need_replay_wals.begin(), need_replay_wals.end()); - for (const auto& wal : need_erase_wals) { - if (_replay_wal_map.erase(wal)) { - LOG(INFO) << "erase wal " << wal << " from _replay_wal_map"; - } else { - LOG(WARNING) << "fail to erase wal " << wal << " from _replay_wal_map"; - } + if (_need_replay(wal_info)) { + need_replay_wals.push_back(it->first); } } + for (const auto& wal : need_erase_wals) { + _replay_wal_map.erase(wal); + } + std::sort(need_replay_wals.begin(), need_replay_wals.end()); for (const auto& wal : need_replay_wals) { + _replaying_queue.emplace_back(_replay_wal_map[wal]); + _replay_wal_map.erase(wal); + } +} + +Status WalTable::relay_wal_one_by_one() { + std::vector<std::shared_ptr<WalInfo>> need_retry_wals; + std::vector<std::shared_ptr<WalInfo>> need_delete_wals; + while (!_replaying_queue.empty()) { + std::shared_ptr<WalInfo> wal_info = nullptr; { std::lock_guard<std::mutex> lock(_replay_wal_lock); - if (_stop.load()) { - break; + wal_info = _replaying_queue.front(); + _replaying_queue.pop_front(); + } + wal_info->add_retry_num(); + auto st = _replay_wal_internal(wal_info->get_wal_path()); + if (!st.ok()) { + LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); + if (!st.is<ErrorCode::NOT_FOUND>()) { + need_retry_wals.push_back(wal_info); } else { - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = true; - } + need_delete_wals.push_back(wal_info); } + } else { + need_delete_wals.push_back(wal_info); + } + VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); + } + { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + for (auto retry_wal_info : need_retry_wals) { + _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info); } - auto st = _replay_wal_internal(wal); + } + for (auto delete_wal_info : need_delete_wals) { + auto st = _delete_wal(delete_wal_info->get_wal_id()); if (!st.ok()) { - std::lock_guard<std::mutex> lock(_replay_wal_lock); - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = false; - } - LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id - << ", table=" << _table_id << ", wal=" << wal << ", st=" << st.to_string(); - break; + LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path(); } - VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id << ", label=" << wal - << ", st=" << st.to_string(); } return Status::OK(); } - -std::string WalTable::_get_tmp_path(const std::string wal) { - std::vector<std::string> path_element; - doris::vectorized::WalReader::string_split(wal, "/", path_element); - std::stringstream ss; - int index = 0; - while (index < path_element.size() - 3) { - ss << path_element[index] << "/"; - index++; +Status WalTable::replay_wals() { + { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + if (_replay_wal_map.empty()) { + LOG(INFO) << "_replay_wal_map is empty,skip relaying"; + return Status::OK(); + } + if (!_replaying_queue.empty()) { + LOG(INFO) << "_replaying_queue is not empty,skip relaying"; + return Status::OK(); + } } - ss << "tmp/"; - while (index < path_element.size()) { - if (index != path_element.size() - 1) { - ss << path_element[index] << "_"; - } else { - ss << path_element[index]; + VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id + << ", wal size=" << _replay_wal_map.size(); + pick_relay_wals(); + RETURN_IF_ERROR(relay_wal_one_by_one()); + return Status::OK(); +} + +Status WalTable::_rename_to_tmp_path(const std::string wal) { + io::Path wal_path = wal; + std::list<std::string> path_element; + for (int i = 0; i < 3; ++i) { + if (!wal_path.has_parent_path()) { + return Status::InternalError("parent path is not enough when rename " + wal); } - index++; + path_element.push_front(wal_path.filename().string()); + wal_path = wal_path.parent_path(); + } + wal_path.append(_exec_env->wal_mgr()->tmp); + for (auto path : path_element) { + wal_path.append(path); + } + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); } - return ss.str(); + auto res = std::rename(wal.c_str(), wal_path.string().c_str()); + if (res < 0) { + return Status::InternalError("rename fail on path " + wal); + } + LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); + return Status::OK(); } -bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) { +bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) { #ifndef BE_TEST - auto& [retry_num, start_ts, replaying] = info; - auto replay_interval = - pow(2, retry_num) * config::group_commit_replay_wal_retry_interval_seconds * 1000; - return UnixMillis() - start_ts >= replay_interval; + auto replay_interval = pow(2, wal_info->get_retry_num()) * + config::group_commit_replay_wal_retry_interval_seconds * 1000; + return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval; #else return true; #endif } -Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) { +Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) { Review Comment: warning: method '_try_abort_txn' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_table.h:53: ```diff - Status _try_abort_txn(int64_t db_id, int64_t wal_id); + static Status _try_abort_txn(int64_t db_id, int64_t wal_id); ``` ########## be/src/olap/wal_table.cpp: ########## @@ -41,125 +38,145 @@ namespace doris { WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {} + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) { + _http_stream_action = std::make_shared<HttpStreamAction>(exec_env); +} WalTable::~WalTable() {} -#ifdef BE_TEST -std::string k_request_line; -#endif - -bool retry = false; - -void WalTable::add_wals(std::vector<std::string> wals) { +void WalTable::add_wal(int64_t wal_id, std::string wal) { Review Comment: warning: method 'add_wal' can be made static [readability-convert-member-functions-to-static] ```suggestion static void WalTable::add_wal(int64_t wal_id, std::string wal) { ``` ########## be/src/olap/wal_info.cpp: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_info.h" +namespace doris { +WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) Review Comment: warning: pass by value and use std::move [modernize-pass-by-value] be/src/olap/wal_info.cpp:18: ```diff - namespace doris { + + #include <utility> + namespace doris { ``` be/src/olap/wal_info.cpp:21: ```diff - _wal_path(wal_path), + _wal_path(std::move(wal_path)), ``` ########## be/src/olap/wal_table.h: ########## @@ -32,25 +34,32 @@ class WalTable { WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id); ~WalTable(); // used when be start and there are wals need to do recovery - void add_wals(std::vector<std::string> wals); + void add_wal(int64_t wal_id, std::string wal); + void pick_relay_wals(); + Status relay_wal_one_by_one(); Status replay_wals(); size_t size(); void stop(); public: - // <retry_num, start_time_ms, is_doing_replay> - using replay_wal_info = std::tuple<int64_t, int64_t, bool>; + // <column_name, column_index> + using ColumnInfo = std::pair<std::string, int64_t>; private: - Status _get_wal_info(const std::string& wal, std::shared_ptr<std::pair<int64_t, std::string>>&); - std::string _get_tmp_path(const std::string wal); - Status _send_request(int64_t wal_id, const std::string& wal, const std::string& label); - Status _abort_txn(int64_t db_id, int64_t wal_id); + Status _parse_wal_path(const std::string& wal, + std::shared_ptr<std::pair<int64_t, std::string>>&); + Status _rename_to_tmp_path(const std::string wal); Review Comment: warning: parameter 'wal' is const-qualified in the function declaration; const-qualification of parameters only has an effect in function definitions [readability-avoid-const-params-in-decls] ```suggestion Status _rename_to_tmp_path(std::string wal); ``` ########## be/src/olap/wal_info.cpp: ########## @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "olap/wal_info.h" +namespace doris { +WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t start_time_ms) + : _wal_id(wal_id), + _wal_path(wal_path), + _retry_num(retry_num), + _start_time_ms(start_time_ms) {} +WalInfo::~WalInfo() {} +int64_t WalInfo::get_wal_id() { + return _wal_id; +} +std::string WalInfo::get_wal_path() { + return _wal_path; +} +int64_t WalInfo::get_retry_num() { + return _retry_num; +} +int64_t WalInfo::get_start_time_ms() { Review Comment: warning: method 'get_start_time_ms' can be made const [readability-make-member-function-const] ```suggestion int64_t WalInfo::get_start_time_ms() const { ``` be/src/olap/wal_info.h:26: ```diff - int64_t get_start_time_ms(); + int64_t get_start_time_ms() const; ``` ########## be/src/olap/wal_manager.cpp: ########## @@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() { &_update_wal_dirs_info_thread); } -void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS wal_status) { +void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status) { Review Comment: warning: method 'add_wal_status_queue' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_manager.h:73: ```diff - void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); + static void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus wal_status); ``` ########## be/src/olap/wal_table.cpp: ########## @@ -181,47 +198,23 @@ Status WalTable::_replay_wal_internal(const std::string& wal) { LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" << _table_id << ", wal=" << wal; - // start a new stream load - { - std::lock_guard<std::mutex> lock(_replay_wal_lock); - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - ++retry_num; - replaying = true; - } else { - LOG(WARNING) << "can not find wal in stream load replay map. db=" << _db_id - << ", table=" << _table_id << ", wal=" << wal; - return Status::OK(); - } - } std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr; - RETURN_IF_ERROR(_get_wal_info(wal, pair)); + RETURN_IF_ERROR(_parse_wal_path(wal, pair)); auto wal_id = pair->first; auto label = pair->second; #ifndef BE_TEST - auto st = _abort_txn(_db_id, wal_id); + auto st = _try_abort_txn(_db_id, wal_id); if (!st.ok()) { LOG(WARNING) << "abort txn " << wal_id << " fail"; } - auto get_st = _get_column_info(_db_id, _table_id); - if (!get_st.ok()) { - if (get_st.is<ErrorCode::NOT_FOUND>()) { - { - std::lock_guard<std::mutex> lock(_replay_wal_lock); - _replay_wal_map.erase(wal); - } - RETURN_IF_ERROR(_delete_wal(wal_id)); - } - return get_st; - } + RETURN_IF_ERROR(_get_column_info(_db_id, _table_id)); #endif - RETURN_IF_ERROR(_send_request(wal_id, wal, label)); + RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label)); return Status::OK(); } -Status WalTable::_get_wal_info(const std::string& wal, - std::shared_ptr<std::pair<int64_t, std::string>>& pair) { +Status WalTable::_parse_wal_path(const std::string& wal, Review Comment: warning: method '_parse_wal_path' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalTable::_parse_wal_path(const std::string& wal, ``` ########## be/src/olap/wal_table.cpp: ########## @@ -236,171 +229,102 @@ return Status::OK(); } -void http_request_done(struct evhttp_request* req, void* arg) { - std::stringstream out; - std::string status; - std::string msg; - std::string wal_id; - size_t len = 0; - if (req != nullptr) { - auto input = evhttp_request_get_input_buffer(req); - char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - while (request_line != nullptr) { - std::string s(request_line); - out << request_line; - request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF); - } - auto out_str = out.str(); - LOG(INFO) << "replay wal out_str:" << out_str; - rapidjson::Document doc; - if (!out_str.empty()) { - doc.Parse(out.str().c_str()); - status = std::string(doc["Status"].GetString()); - msg = std::string(doc["Message"].GetString()); - LOG(INFO) << "replay wal status:" << status << ",msg:" << msg; - if (status.find("Fail") != status.npos) { - if (msg.find("Label") != msg.npos && - msg.find("has already been used") != msg.npos) { - retry = false; - } else { - retry = true; - } - } else { - retry = false; - } - } else { - retry = true; - } - } else { - LOG(WARNING) << "req is null"; - } - - if (arg != nullptr) { - event_base_loopbreak((struct event_base*)arg); - } else { - LOG(WARNING) << "arg is null"; - } -} - -Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const std::string& label) { -#ifndef BE_TEST - struct event_base* base = nullptr; - struct evhttp_connection* conn = nullptr; - struct evhttp_request* req = nullptr; - retry = false; - event_init(); - base = event_base_new(); - conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port); - evhttp_connection_set_base(conn, base); - req = evhttp_request_new(http_request_done, base); - evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(), label.c_str()); - evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(), std::to_string(wal_id).c_str()); - evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(), std::to_string(wal_id).c_str()); +Status WalTable::_construct_sql_str(const std::string& wal, const std::string& label, + std::string& sql_str, std::vector<size_t>& index_vector) { std::string columns; RETURN_IF_ERROR(_read_wal_header(wal, columns)); std::vector<std::string> column_id_element; doris::vectorized::WalReader::string_split(columns, ",", column_id_element); - std::vector<size_t> index_vector; std::stringstream ss_name; std::stringstream ss_id; int index_raw = 0; for (auto column_id_str : column_id_element) { try { int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10); - auto it = _column_id_name_map.find(column_id); - auto it2 = _column_id_index_map.find(column_id); - if (it != _column_id_name_map.end() && it2 != _column_id_index_map.end()) { - ss_name << "`" << it->second << "`,"; - ss_id << "c" << std::to_string(_column_id_index_map[column_id]) << ","; + auto it = _column_id_info_map.find(column_id); + if (it != _column_id_info_map.end()) { + ss_name << "`" << it->second->first << "`,"; + ss_id << "c" << std::to_string(it->second->second) << ","; index_vector.emplace_back(index_raw); - _column_id_name_map.erase(column_id); - _column_id_index_map.erase(column_id); } index_raw++; } catch (const std::invalid_argument& e) { return Status::InvalidArgument("Invalid format, {}", e.what()); } } - _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); auto name = ss_name.str().substr(0, ss_name.str().size() - 1); auto id = ss_id.str().substr(0, ss_id.str().size() - 1); std::stringstream ss; ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL " << label << " (" << name << ") select " << id << " from http_stream(\"format\" = \"wal\", \"table_id\" = \"" << std::to_string(_table_id) << "\")"; - evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str()); - evbuffer* output = evhttp_request_get_output_buffer(req); - evbuffer_add_printf(output, "replay wal %s", std::to_string(wal_id).c_str()); - - evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream"); - evhttp_connection_set_timeout(req->evcon, 300); - - event_base_dispatch(base); - evhttp_connection_free(conn); - event_base_free(base); - -#else - std::stringstream out; - out << k_request_line; - auto out_str = out.str(); - rapidjson::Document doc; - doc.Parse(out_str.c_str()); - auto status = std::string(doc["Status"].GetString()); - if (status.find("Fail") != status.npos) { - retry = true; - } else { - retry = false; + sql_str = ss.str().data(); + return Status::OK(); +} +Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal, + const std::string& label) { + std::string sql_str; + std::vector<size_t> index_vector; + RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector)); + _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector); + std::shared_ptr<StreamLoadContext> ctx = std::make_shared<StreamLoadContext>(_exec_env); + ctx->sql_str = sql_str; + ctx->wal_id = wal_id; + ctx->auth.auth_code = wal_id; + ctx->label = label; + auto st = _http_stream_action->process_put(nullptr, ctx); + if (st.ok()) { + // wait stream load finish + RETURN_IF_ERROR(ctx->future.get()); + if (ctx->status.ok()) { + auto commit_st = _exec_env->stream_load_executor()->commit_txn(ctx.get()); + st = commit_st; + } else if (!ctx->status.ok()) { + LOG(WARNING) << "handle streaming load failed, id=" << ctx->id + << ", errmsg=" << ctx->status; + _exec_env->stream_load_executor()->rollback_txn(ctx.get()); + st = ctx->status; + } } + _exec_env->wal_mgr()->erase_wal_column_index(wal_id); + LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string(); + return st; +} + +Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, Review Comment: warning: method '_replay_one_txn_with_stremaload' can be made static [readability-convert-member-functions-to-static] ```suggestion static Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const std::string& wal, ``` ########## be/src/olap/wal_table.cpp: ########## @@ -41,125 +38,145 @@ namespace doris { WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id) - : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) {} + : _exec_env(exec_env), _db_id(db_id), _table_id(table_id), _stop(false) { + _http_stream_action = std::make_shared<HttpStreamAction>(exec_env); +} WalTable::~WalTable() {} -#ifdef BE_TEST -std::string k_request_line; -#endif - -bool retry = false; - -void WalTable::add_wals(std::vector<std::string> wals) { +void WalTable::add_wal(int64_t wal_id, std::string wal) { std::lock_guard<std::mutex> lock(_replay_wal_lock); - for (const auto& wal : wals) { - LOG(INFO) << "add replay wal " << wal; - _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false}); - } + LOG(INFO) << "add replay wal " << wal; + auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis()); + _replay_wal_map.emplace(wal, wal_info); } -Status WalTable::replay_wals() { +void WalTable::pick_relay_wals() { + std::lock_guard<std::mutex> lock(_replay_wal_lock); std::vector<std::string> need_replay_wals; std::vector<std::string> need_erase_wals; - { - std::lock_guard<std::mutex> lock(_replay_wal_lock); - if (_replay_wal_map.empty()) { - return Status::OK(); - } - VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id - << ", wal size=" << _replay_wal_map.size(); - for (auto& [wal, info] : _replay_wal_map) { - auto& [retry_num, start_ts, replaying] = info; - if (replaying) { - LOG(INFO) << wal << " is replaying, skip this round"; - return Status::OK(); - } - if (retry_num >= config::group_commit_replay_wal_retry_num) { - LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id - << ", wal=" << wal - << ", retry_num=" << config::group_commit_replay_wal_retry_num; - std::string rename_path = _get_tmp_path(wal); - LOG(INFO) << "rename wal from " << wal << " to " << rename_path; - std::rename(wal.c_str(), rename_path.c_str()); - need_erase_wals.push_back(wal); - continue; - } - if (_need_replay(info)) { - need_replay_wals.push_back(wal); + for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) { + auto wal_info = it->second; + if (wal_info->get_retry_num() >= config::group_commit_replay_wal_retry_num) { + LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", table=" << _table_id + << ", wal=" << it->first << ", retry_num=" << wal_info->get_retry_num(); + auto st = _rename_to_tmp_path(it->first); + if (!st.ok()) { + LOG(WARNING) << "rename " << it->first << " fail" + << ",st:" << st.to_string(); } + need_erase_wals.push_back(it->first); + continue; } - std::sort(need_replay_wals.begin(), need_replay_wals.end()); - for (const auto& wal : need_erase_wals) { - if (_replay_wal_map.erase(wal)) { - LOG(INFO) << "erase wal " << wal << " from _replay_wal_map"; - } else { - LOG(WARNING) << "fail to erase wal " << wal << " from _replay_wal_map"; - } + if (_need_replay(wal_info)) { + need_replay_wals.push_back(it->first); } } + for (const auto& wal : need_erase_wals) { + _replay_wal_map.erase(wal); + } + std::sort(need_replay_wals.begin(), need_replay_wals.end()); for (const auto& wal : need_replay_wals) { + _replaying_queue.emplace_back(_replay_wal_map[wal]); + _replay_wal_map.erase(wal); + } +} + +Status WalTable::relay_wal_one_by_one() { + std::vector<std::shared_ptr<WalInfo>> need_retry_wals; + std::vector<std::shared_ptr<WalInfo>> need_delete_wals; + while (!_replaying_queue.empty()) { + std::shared_ptr<WalInfo> wal_info = nullptr; { std::lock_guard<std::mutex> lock(_replay_wal_lock); - if (_stop.load()) { - break; + wal_info = _replaying_queue.front(); + _replaying_queue.pop_front(); + } + wal_info->add_retry_num(); + auto st = _replay_wal_internal(wal_info->get_wal_path()); + if (!st.ok()) { + LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); + if (!st.is<ErrorCode::NOT_FOUND>()) { + need_retry_wals.push_back(wal_info); } else { - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = true; - } + need_delete_wals.push_back(wal_info); } + } else { + need_delete_wals.push_back(wal_info); + } + VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id + << ", wal=" << wal_info->get_wal_path() << ", st=" << st.to_string(); + } + { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + for (auto retry_wal_info : need_retry_wals) { + _replay_wal_map.emplace(retry_wal_info->get_wal_path(), retry_wal_info); } - auto st = _replay_wal_internal(wal); + } + for (auto delete_wal_info : need_delete_wals) { + auto st = _delete_wal(delete_wal_info->get_wal_id()); if (!st.ok()) { - std::lock_guard<std::mutex> lock(_replay_wal_lock); - auto it = _replay_wal_map.find(wal); - if (it != _replay_wal_map.end()) { - auto& [retry_num, start_time, replaying] = it->second; - replaying = false; - } - LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id - << ", table=" << _table_id << ", wal=" << wal << ", st=" << st.to_string(); - break; + LOG(WARNING) << "fail to delete wal " << delete_wal_info->get_wal_path(); } - VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id << ", label=" << wal - << ", st=" << st.to_string(); } return Status::OK(); } - -std::string WalTable::_get_tmp_path(const std::string wal) { - std::vector<std::string> path_element; - doris::vectorized::WalReader::string_split(wal, "/", path_element); - std::stringstream ss; - int index = 0; - while (index < path_element.size() - 3) { - ss << path_element[index] << "/"; - index++; +Status WalTable::replay_wals() { + { + std::lock_guard<std::mutex> lock(_replay_wal_lock); + if (_replay_wal_map.empty()) { + LOG(INFO) << "_replay_wal_map is empty,skip relaying"; + return Status::OK(); + } + if (!_replaying_queue.empty()) { + LOG(INFO) << "_replaying_queue is not empty,skip relaying"; + return Status::OK(); + } } - ss << "tmp/"; - while (index < path_element.size()) { - if (index != path_element.size() - 1) { - ss << path_element[index] << "_"; - } else { - ss << path_element[index]; + VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" << _table_id + << ", wal size=" << _replay_wal_map.size(); + pick_relay_wals(); + RETURN_IF_ERROR(relay_wal_one_by_one()); + return Status::OK(); +} + +Status WalTable::_rename_to_tmp_path(const std::string wal) { + io::Path wal_path = wal; + std::list<std::string> path_element; + for (int i = 0; i < 3; ++i) { + if (!wal_path.has_parent_path()) { + return Status::InternalError("parent path is not enough when rename " + wal); } - index++; + path_element.push_front(wal_path.filename().string()); + wal_path = wal_path.parent_path(); + } + wal_path.append(_exec_env->wal_mgr()->tmp); + for (auto path : path_element) { + wal_path.append(path); + } + bool exists = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(), &exists)); + if (!exists) { + RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path())); } - return ss.str(); + auto res = std::rename(wal.c_str(), wal_path.string().c_str()); + if (res < 0) { + return Status::InternalError("rename fail on path " + wal); + } + LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string(); + return Status::OK(); } -bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) { +bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) { Review Comment: warning: method '_need_replay' can be made static [readability-convert-member-functions-to-static] be/src/olap/wal_table.h:56: ```diff - bool _need_replay(std::shared_ptr<WalInfo>); + static bool _need_replay(std::shared_ptr<WalInfo>); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
