This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 03901b9a7a1 [enhancement](group_commit): refector relay wal code
(#29183)
03901b9a7a1 is described below
commit 03901b9a7a1970162fa71fd3892905496ef892c3
Author: huanghaibin <[email protected]>
AuthorDate: Sat Dec 30 12:59:46 2023 +0800
[enhancement](group_commit): refector relay wal code (#29183)
---
be/src/http/action/http_stream.cpp | 16 +-
be/src/http/action/http_stream.h | 2 +-
be/src/olap/wal_dirs_info.cpp | 1 +
be/src/olap/wal_info.cpp | 41 ++
be/src/olap/wal_info.h | 38 ++
be/src/olap/wal_manager.cpp | 41 +-
be/src/olap/wal_manager.h | 17 +-
be/src/olap/wal_table.cpp | 428 +++++++++------------
be/src/olap/wal_table.h | 32 +-
be/src/runtime/group_commit_mgr.cpp | 26 +-
be/src/runtime/stream_load/stream_load_context.h | 1 +
be/src/vec/exec/format/wal/wal_reader.cpp | 2 -
be/src/vec/sink/writer/vwal_writer.cpp | 2 +-
be/test/olap/wal_manager_test.cpp | 6 +-
.../apache/doris/analysis/NativeInsertStmt.java | 2 +-
gensrc/thrift/FrontendService.thrift | 14 +-
16 files changed, 346 insertions(+), 323 deletions(-)
diff --git a/be/src/http/action/http_stream.cpp
b/be/src/http/action/http_stream.cpp
index a18b10f491f..15d2a1a18d1 100644
--- a/be/src/http/action/http_stream.cpp
+++ b/be/src/http/action/http_stream.cpp
@@ -256,7 +256,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
} else {
LOG(INFO) << "use a portion of data to request fe to obtain
column information";
ctx->is_read_schema = false;
- ctx->status = _process_put(req, ctx);
+ ctx->status = process_put(req, ctx);
}
}
@@ -272,7 +272,7 @@ void HttpStreamAction::on_chunk_data(HttpRequest* req) {
LOG(INFO) << "after all the data has been read and it has not reached
1M, it will execute "
<< "here";
ctx->is_read_schema = false;
- ctx->status = _process_put(req, ctx);
+ ctx->status = process_put(req, ctx);
}
ctx->read_data_cost_nanos += (MonotonicNanos() - start_read_data_time);
}
@@ -290,11 +290,15 @@ void
HttpStreamAction::free_handler_ctx(std::shared_ptr<void> param) {
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
}
-Status HttpStreamAction::_process_put(HttpRequest* http_req,
- std::shared_ptr<StreamLoadContext> ctx) {
+Status HttpStreamAction::process_put(HttpRequest* http_req,
+ std::shared_ptr<StreamLoadContext> ctx) {
TStreamLoadPutRequest request;
set_request_auth(&request, ctx->auth);
- request.__set_load_sql(http_req->header(HTTP_SQL));
+ if (http_req != nullptr) {
+ request.__set_load_sql(http_req->header(HTTP_SQL));
+ } else {
+ request.__set_load_sql(ctx->sql_str);
+ }
request.__set_loadId(ctx->id.to_thrift());
request.__set_label(ctx->label);
if (ctx->group_commit) {
@@ -330,7 +334,7 @@ Status HttpStreamAction::_process_put(HttpRequest* http_req,
ctx->txn_id = ctx->put_result.params.txn_conf.txn_id;
ctx->label = ctx->put_result.params.import_label;
ctx->put_result.params.__set_wal_id(ctx->wal_id);
- if (http_req->header(HTTP_GROUP_COMMIT) == "async_mode") {
+ if (http_req != nullptr && http_req->header(HTTP_GROUP_COMMIT) ==
"async_mode") {
if (!http_req->header(HttpHeaders::CONTENT_LENGTH).empty()) {
size_t content_length =
std::stol(http_req->header(HttpHeaders::CONTENT_LENGTH));
if (ctx->format == TFileFormatType::FORMAT_CSV_GZ ||
diff --git a/be/src/http/action/http_stream.h b/be/src/http/action/http_stream.h
index d4140a118d6..90a8b48fb2b 100644
--- a/be/src/http/action/http_stream.h
+++ b/be/src/http/action/http_stream.h
@@ -43,11 +43,11 @@ public:
void on_chunk_data(HttpRequest* req) override;
void free_handler_ctx(std::shared_ptr<void> ctx) override;
+ Status process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
private:
Status _on_header(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
Status _handle(HttpRequest* req, std::shared_ptr<StreamLoadContext> ctx);
- Status _process_put(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
void _save_stream_load_record(std::shared_ptr<StreamLoadContext> ctx,
const std::string& str);
Status _handle_group_commit(HttpRequest* http_req,
std::shared_ptr<StreamLoadContext> ctx);
diff --git a/be/src/olap/wal_dirs_info.cpp b/be/src/olap/wal_dirs_info.cpp
index 340d896a8c6..1b7216b0724 100644
--- a/be/src/olap/wal_dirs_info.cpp
+++ b/be/src/olap/wal_dirs_info.cpp
@@ -169,6 +169,7 @@ size_t WalDirsInfo::get_max_available_size() {
Status WalDirsInfo::update_wal_dir_limit(std::string wal_dir, size_t limit) {
for (const auto& wal_dir_info : _wal_dirs_info_vec) {
+ LOG(INFO) << "wal_dir_info:" << wal_dir_info->get_wal_dir();
if (wal_dir_info->get_wal_dir() == wal_dir) {
return wal_dir_info->update_wal_dir_limit(limit);
}
diff --git a/be/src/olap/wal_info.cpp b/be/src/olap/wal_info.cpp
new file mode 100644
index 00000000000..d93593cfaf0
--- /dev/null
+++ b/be/src/olap/wal_info.cpp
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "olap/wal_info.h"
+namespace doris {
+WalInfo::WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num,
int64_t start_time_ms)
+ : _wal_id(wal_id),
+ _wal_path(wal_path),
+ _retry_num(retry_num),
+ _start_time_ms(start_time_ms) {}
+WalInfo::~WalInfo() {}
+int64_t WalInfo::get_wal_id() {
+ return _wal_id;
+}
+std::string WalInfo::get_wal_path() {
+ return _wal_path;
+}
+int64_t WalInfo::get_retry_num() {
+ return _retry_num;
+}
+int64_t WalInfo::get_start_time_ms() {
+ return _start_time_ms;
+}
+void WalInfo::add_retry_num() {
+ _retry_num++;
+}
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_info.h b/be/src/olap/wal_info.h
new file mode 100644
index 00000000000..0383ac68f2c
--- /dev/null
+++ b/be/src/olap/wal_info.h
@@ -0,0 +1,38 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+#include "runtime/exec_env.h"
+
+namespace doris {
+class WalInfo {
+public:
+ WalInfo(int64_t wal_id, std::string wal_path, int64_t retry_num, int64_t
start_time_ms);
+ ~WalInfo();
+ int64_t get_wal_id();
+ int64_t get_retry_num();
+ int64_t get_start_time_ms();
+ std::string get_wal_path();
+ void add_retry_num();
+
+private:
+ int64_t _wal_id;
+ std::string _wal_path;
+ int64_t _retry_num;
+ int64_t _start_time_ms;
+};
+
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_manager.cpp b/be/src/olap/wal_manager.cpp
index 0fe59cdf9ca..a9c35794698 100644
--- a/be/src/olap/wal_manager.cpp
+++ b/be/src/olap/wal_manager.cpp
@@ -82,6 +82,9 @@ Status WalManager::init() {
RETURN_IF_ERROR(_init_wal_dirs_conf());
RETURN_IF_ERROR(_init_wal_dirs());
RETURN_IF_ERROR(_init_wal_dirs_info());
+ for (auto wal_dir : _wal_dirs) {
+ RETURN_IF_ERROR(scan_wals(wal_dir));
+ }
return Thread::create(
"WalMgr", "replay_wal", [this]() {
static_cast<void>(this->replay()); },
&_replay_thread);
@@ -112,7 +115,7 @@ Status WalManager::_init_wal_dirs_conf() {
Status WalManager::_init_wal_dirs() {
bool exists = false;
for (auto wal_dir : _wal_dirs) {
- std::string tmp_dir = wal_dir + "/tmp";
+ std::string tmp_dir = wal_dir + "/" + tmp;
LOG(INFO) << "wal_dir:" << wal_dir << ",tmp_dir:" << tmp_dir;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_dir,
&exists));
if (!exists) {
@@ -122,7 +125,6 @@ Status WalManager::_init_wal_dirs() {
if (!exists) {
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(tmp_dir));
}
- RETURN_IF_ERROR(scan_wals(wal_dir));
}
return Status::OK();
}
@@ -164,15 +166,15 @@ Status WalManager::_init_wal_dirs_info() {
&_update_wal_dirs_info_thread);
}
-void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WAL_STATUS wal_status) {
+void WalManager::add_wal_status_queue(int64_t table_id, int64_t wal_id,
WalStatus wal_status) {
std::lock_guard<std::shared_mutex> wrlock(_wal_status_lock);
LOG(INFO) << "add wal queue "
<< ",table_id:" << table_id << ",wal_id:" << wal_id <<
",status:" << wal_status;
auto it = _wal_status_queues.find(table_id);
if (it == _wal_status_queues.end()) {
- std::unordered_map<int64_t, WAL_STATUS> tmp;
- tmp.emplace(wal_id, wal_status);
- _wal_status_queues.emplace(table_id, tmp);
+ std::unordered_map<int64_t, WalStatus> tmp_map;
+ tmp_map.emplace(wal_id, wal_status);
+ _wal_status_queues.emplace(table_id, tmp_map);
} else {
it->second.emplace(wal_id, wal_status);
}
@@ -305,12 +307,12 @@ Status WalManager::scan_wals(const std::string& wal_path)
{
LOG(WARNING) << "Failed list files for dir=" << wal_path << ", st=" <<
st.to_string();
return st;
}
- for (const auto& db_id : dbs) {
- if (db_id.is_file) {
+ for (const auto& database_id : dbs) {
+ if (database_id.is_file || database_id.file_name == tmp) {
continue;
}
std::vector<io::FileInfo> tables;
- auto db_path = wal_path + "/" + db_id.file_name;
+ auto db_path = wal_path + "/" + database_id.file_name;
st = io::global_local_filesystem()->list(db_path, false, &tables,
&exists);
if (!st.ok()) {
LOG(WARNING) << "Failed list files for dir=" << db_path << ", st="
<< st.to_string();
@@ -342,20 +344,16 @@ Status WalManager::scan_wals(const std::string& wal_path)
{
int64_t wal_id =
std::strtoll(wal.file_name.substr(0,
pos).c_str(), NULL, 10);
_wal_path_map.emplace(wal_id, wal_file);
+ int64_t db_id =
std::strtoll(database_id.file_name.c_str(), NULL, 10);
int64_t tb_id =
std::strtoll(table_id.file_name.c_str(), NULL, 10);
- add_wal_status_queue(tb_id, wal_id,
WalManager::WAL_STATUS::REPLAY);
+ add_wal_status_queue(tb_id, wal_id,
WalManager::WalStatus::REPLAY);
+ RETURN_IF_ERROR(add_recover_wal(db_id, tb_id, wal_id,
wal_file));
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}",
e.what());
}
}
}
- st = add_recover_wal(std::stoll(db_id.file_name),
std::stoll(table_id.file_name), res);
count += res.size();
- if (!st.ok()) {
- LOG(WARNING) << "Failed add replay wal, db=" << db_id.file_name
- << ", table=" << table_id.file_name << ", st=" <<
st.to_string();
- return st;
- }
}
}
LOG(INFO) << "Finish list all wals, size:" << count;
@@ -396,7 +394,8 @@ Status WalManager::replay() {
return Status::OK();
}
-Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id,
std::vector<std::string> wals) {
+Status WalManager::add_recover_wal(int64_t db_id, int64_t table_id, int64_t
wal_id,
+ std::string wal) {
std::lock_guard<std::shared_mutex> wrlock(_lock);
std::shared_ptr<WalTable> table_ptr;
auto it = _table_map.find(table_id);
@@ -406,12 +405,10 @@ Status WalManager::add_recover_wal(int64_t db_id, int64_t
table_id, std::vector<
} else {
table_ptr = it->second;
}
- table_ptr->add_wals(wals);
+ table_ptr->add_wal(wal_id, wal);
#ifndef BE_TEST
- for (auto wal : wals) {
- RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
- RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
- }
+ RETURN_IF_ERROR(update_wal_dir_limit(_get_base_wal_path(wal)));
+ RETURN_IF_ERROR(update_wal_dir_used(_get_base_wal_path(wal)));
#endif
return Status::OK();
}
diff --git a/be/src/olap/wal_manager.h b/be/src/olap/wal_manager.h
index 8d330a64bb9..4b42beaf455 100644
--- a/be/src/olap/wal_manager.h
+++ b/be/src/olap/wal_manager.h
@@ -47,7 +47,7 @@ class WalManager {
ENABLE_FACTORY_CREATOR(WalManager);
public:
- enum WAL_STATUS {
+ enum WalStatus {
PREPARE = 0,
REPLAY,
CREATE,
@@ -56,6 +56,7 @@ public:
public:
WalManager(ExecEnv* exec_env, const std::string& wal_dir);
~WalManager();
+ // used for wal
Status delete_wal(int64_t wal_id, size_t block_queue_pre_allocated = 0);
Status init();
Status scan_wals(const std::string& wal_path);
@@ -64,13 +65,13 @@ public:
Status create_wal_writer(int64_t wal_id, std::shared_ptr<WalWriter>&
wal_writer);
Status scan();
size_t get_wal_table_size(int64_t table_id);
- Status add_recover_wal(int64_t db_id, int64_t table_id,
std::vector<std::string> wals);
+ Status add_recover_wal(int64_t db_id, int64_t table_id, int64_t wal_id,
std::string wal);
Status add_wal_path(int64_t db_id, int64_t table_id, int64_t wal_id, const
std::string& label,
std::string& base_path);
Status get_wal_path(int64_t wal_id, std::string& wal_path);
Status get_wal_status_queue_size(const PGetWalQueueSizeRequest* request,
PGetWalQueueSizeResponse* response);
- void add_wal_status_queue(int64_t table_id, int64_t wal_id, WAL_STATUS
wal_status);
+ void add_wal_status_queue(int64_t table_id, int64_t wal_id, WalStatus
wal_status);
Status erase_wal_status_queue(int64_t table_id, int64_t wal_id);
void print_wal_status_queue();
void stop();
@@ -80,6 +81,7 @@ public:
void erase_wal_column_index(int64_t wal_id);
Status get_wal_column_index(int64_t wal_id, std::vector<size_t>&
column_index);
+ // used for limit
Status update_wal_dir_limit(const std::string& wal_dir, size_t limit = -1);
Status update_wal_dir_used(const std::string& wal_dir, size_t used = -1);
Status update_wal_dir_pre_allocated(const std::string& wal_dir, size_t
pre_allocated,
@@ -88,6 +90,7 @@ public:
size_t get_max_available_size();
private:
+ // used for limit
Status _init_wal_dirs_conf();
Status _init_wal_dirs();
Status _init_wal_dirs_info();
@@ -99,11 +102,13 @@ public:
// used for be ut
size_t wal_limit_test_bytes;
+ const std::string tmp = "tmp";
+
private:
+ //used for wal
ExecEnv* _exec_env = nullptr;
std::shared_mutex _lock;
scoped_refptr<Thread> _replay_thread;
- scoped_refptr<Thread> _update_wal_dirs_info_thread;
CountDownLatch _stop_background_threads_latch;
std::map<int64_t, std::shared_ptr<WalTable>> _table_map;
std::vector<std::string> _wal_dirs;
@@ -111,11 +116,13 @@ private:
std::shared_mutex _wal_status_lock;
std::unordered_map<int64_t, std::string> _wal_path_map;
std::unordered_map<int64_t, std::shared_ptr<WalWriter>>
_wal_id_to_writer_map;
- std::unordered_map<int64_t, std::unordered_map<int64_t, WAL_STATUS>>
_wal_status_queues;
+ std::unordered_map<int64_t, std::unordered_map<int64_t, WalStatus>>
_wal_status_queues;
std::atomic<bool> _stop;
std::shared_mutex _wal_column_id_map_lock;
std::unordered_map<int64_t, std::vector<size_t>&> _wal_column_id_map;
std::unique_ptr<doris::ThreadPool> _thread_pool;
+ // used for limit
+ scoped_refptr<Thread> _update_wal_dirs_info_thread;
std::unique_ptr<WalDirsInfo> _wal_dirs_info;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/olap/wal_table.cpp b/be/src/olap/wal_table.cpp
index 7f98c410b07..a1c9a12f3e1 100644
--- a/be/src/olap/wal_table.cpp
+++ b/be/src/olap/wal_table.cpp
@@ -17,19 +17,16 @@
#include "olap/wal_table.h"
-#include <event2/bufferevent.h>
-#include <event2/event.h>
-#include <event2/event_struct.h>
-#include <event2/http.h>
#include <thrift/protocol/TDebugProtocol.h>
-#include "evhttp.h"
+#include "http/action/http_stream.h"
#include "http/action/stream_load.h"
#include "http/ev_http_server.h"
#include "http/http_common.h"
#include "http/http_headers.h"
#include "http/utils.h"
#include "io/fs/local_file_system.h"
+#include "io/fs/stream_load_pipe.h"
#include "olap/wal_manager.h"
#include "runtime/client_cache.h"
#include "runtime/fragment_mgr.h"
@@ -41,125 +38,149 @@
namespace doris {
WalTable::WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id)
- : _exec_env(exec_env), _db_id(db_id), _table_id(table_id),
_stop(false) {}
+ : _exec_env(exec_env), _db_id(db_id), _table_id(table_id),
_stop(false) {
+ _http_stream_action = std::make_shared<HttpStreamAction>(exec_env);
+}
WalTable::~WalTable() {}
#ifdef BE_TEST
-std::string k_request_line;
+Status k_stream_load_exec_status;
#endif
-bool retry = false;
-
-void WalTable::add_wals(std::vector<std::string> wals) {
+void WalTable::add_wal(int64_t wal_id, std::string wal) {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
- for (const auto& wal : wals) {
- LOG(INFO) << "add replay wal " << wal;
- _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(), false});
- }
+ LOG(INFO) << "add replay wal " << wal;
+ auto wal_info = std::make_shared<WalInfo>(wal_id, wal, 0, UnixMillis());
+ _replay_wal_map.emplace(wal, wal_info);
}
-Status WalTable::replay_wals() {
+void WalTable::pick_relay_wals() {
+ std::lock_guard<std::mutex> lock(_replay_wal_lock);
std::vector<std::string> need_replay_wals;
std::vector<std::string> need_erase_wals;
- {
- std::lock_guard<std::mutex> lock(_replay_wal_lock);
- if (_replay_wal_map.empty()) {
- return Status::OK();
- }
- VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" <<
_table_id
- << ", wal size=" << _replay_wal_map.size();
- for (auto& [wal, info] : _replay_wal_map) {
- auto& [retry_num, start_ts, replaying] = info;
- if (replaying) {
- LOG(INFO) << wal << " is replaying, skip this round";
- return Status::OK();
- }
- if (retry_num >= config::group_commit_replay_wal_retry_num) {
- LOG(WARNING) << "All replay wal failed, db=" << _db_id << ",
table=" << _table_id
- << ", wal=" << wal
- << ", retry_num=" <<
config::group_commit_replay_wal_retry_num;
- std::string rename_path = _get_tmp_path(wal);
- LOG(INFO) << "rename wal from " << wal << " to " <<
rename_path;
- std::rename(wal.c_str(), rename_path.c_str());
- need_erase_wals.push_back(wal);
- continue;
- }
- if (_need_replay(info)) {
- need_replay_wals.push_back(wal);
+ for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++)
{
+ auto wal_info = it->second;
+ if (wal_info->get_retry_num() >=
config::group_commit_replay_wal_retry_num) {
+ LOG(WARNING) << "All replay wal failed, db=" << _db_id << ",
table=" << _table_id
+ << ", wal=" << it->first << ", retry_num=" <<
wal_info->get_retry_num();
+ auto st = _rename_to_tmp_path(it->first);
+ if (!st.ok()) {
+ LOG(WARNING) << "rename " << it->first << " fail"
+ << ",st:" << st.to_string();
}
+ need_erase_wals.push_back(it->first);
+ continue;
}
- std::sort(need_replay_wals.begin(), need_replay_wals.end());
- for (const auto& wal : need_erase_wals) {
- if (_replay_wal_map.erase(wal)) {
- LOG(INFO) << "erase wal " << wal << " from _replay_wal_map";
- } else {
- LOG(WARNING) << "fail to erase wal " << wal << " from
_replay_wal_map";
- }
+ if (_need_replay(wal_info)) {
+ need_replay_wals.push_back(it->first);
}
}
+ for (const auto& wal : need_erase_wals) {
+ _replay_wal_map.erase(wal);
+ }
+ std::sort(need_replay_wals.begin(), need_replay_wals.end());
for (const auto& wal : need_replay_wals) {
+ _replaying_queue.emplace_back(_replay_wal_map[wal]);
+ _replay_wal_map.erase(wal);
+ }
+}
+
+Status WalTable::relay_wal_one_by_one() {
+ std::vector<std::shared_ptr<WalInfo>> need_retry_wals;
+ std::vector<std::shared_ptr<WalInfo>> need_delete_wals;
+ while (!_replaying_queue.empty()) {
+ std::shared_ptr<WalInfo> wal_info = nullptr;
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
- if (_stop.load()) {
- break;
+ wal_info = _replaying_queue.front();
+ _replaying_queue.pop_front();
+ }
+ wal_info->add_retry_num();
+ auto st = _replay_wal_internal(wal_info->get_wal_path());
+ if (!st.ok()) {
+ LOG(WARNING) << "failed replay wal, db=" << _db_id << ", table="
<< _table_id
+ << ", wal=" << wal_info->get_wal_path() << ", st=" <<
st.to_string();
+ if (!st.is<ErrorCode::NOT_FOUND>()) {
+ need_retry_wals.push_back(wal_info);
} else {
- auto it = _replay_wal_map.find(wal);
- if (it != _replay_wal_map.end()) {
- auto& [retry_num, start_time, replaying] = it->second;
- replaying = true;
- }
+ need_delete_wals.push_back(wal_info);
}
+ } else {
+ need_delete_wals.push_back(wal_info);
+ }
+ VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id
+ << ", wal=" << wal_info->get_wal_path() << ", st=" <<
st.to_string();
+ }
+ {
+ std::lock_guard<std::mutex> lock(_replay_wal_lock);
+ for (auto retry_wal_info : need_retry_wals) {
+ _replay_wal_map.emplace(retry_wal_info->get_wal_path(),
retry_wal_info);
}
- auto st = _replay_wal_internal(wal);
+ }
+ for (auto delete_wal_info : need_delete_wals) {
+ auto st = _delete_wal(delete_wal_info->get_wal_id());
if (!st.ok()) {
- std::lock_guard<std::mutex> lock(_replay_wal_lock);
- auto it = _replay_wal_map.find(wal);
- if (it != _replay_wal_map.end()) {
- auto& [retry_num, start_time, replaying] = it->second;
- replaying = false;
- }
- LOG(WARNING) << "failed replay wal, drop this round, db=" << _db_id
- << ", table=" << _table_id << ", wal=" << wal << ",
st=" << st.to_string();
- break;
+ LOG(WARNING) << "fail to delete wal " <<
delete_wal_info->get_wal_path();
}
- VLOG_NOTICE << "replay wal, db=" << _db_id << ", table=" << _table_id
<< ", label=" << wal
- << ", st=" << st.to_string();
}
return Status::OK();
}
-
-std::string WalTable::_get_tmp_path(const std::string wal) {
- std::vector<std::string> path_element;
- doris::vectorized::WalReader::string_split(wal, "/", path_element);
- std::stringstream ss;
- int index = 0;
- while (index < path_element.size() - 3) {
- ss << path_element[index] << "/";
- index++;
+Status WalTable::replay_wals() {
+ {
+ std::lock_guard<std::mutex> lock(_replay_wal_lock);
+ if (_replay_wal_map.empty()) {
+ LOG(INFO) << "_replay_wal_map is empty,skip relaying";
+ return Status::OK();
+ }
+ if (!_replaying_queue.empty()) {
+ LOG(INFO) << "_replaying_queue is not empty,skip relaying";
+ return Status::OK();
+ }
}
- ss << "tmp/";
- while (index < path_element.size()) {
- if (index != path_element.size() - 1) {
- ss << path_element[index] << "_";
- } else {
- ss << path_element[index];
+ VLOG_DEBUG << "Start replay wals for db=" << _db_id << ", table=" <<
_table_id
+ << ", wal size=" << _replay_wal_map.size();
+ pick_relay_wals();
+ RETURN_IF_ERROR(relay_wal_one_by_one());
+ return Status::OK();
+}
+
+Status WalTable::_rename_to_tmp_path(const std::string wal) {
+ io::Path wal_path = wal;
+ std::list<std::string> path_element;
+ for (int i = 0; i < 3; ++i) {
+ if (!wal_path.has_parent_path()) {
+ return Status::InternalError("parent path is not enough when
rename " + wal);
}
- index++;
+ path_element.push_front(wal_path.filename().string());
+ wal_path = wal_path.parent_path();
}
- return ss.str();
+ wal_path.append(_exec_env->wal_mgr()->tmp);
+ for (auto path : path_element) {
+ wal_path.append(path);
+ }
+ bool exists = false;
+
RETURN_IF_ERROR(io::global_local_filesystem()->exists(wal_path.parent_path(),
&exists));
+ if (!exists) {
+
RETURN_IF_ERROR(io::global_local_filesystem()->create_directory(wal_path.parent_path()));
+ }
+ auto res = std::rename(wal.c_str(), wal_path.string().c_str());
+ if (res < 0) {
+ return Status::InternalError("rename fail on path " + wal);
+ }
+ LOG(INFO) << "rename wal from " << wal << " to " << wal_path.string();
+ return Status::OK();
}
-bool WalTable::_need_replay(const doris::WalTable::replay_wal_info& info) {
+bool WalTable::_need_replay(std::shared_ptr<WalInfo> wal_info) {
#ifndef BE_TEST
- auto& [retry_num, start_ts, replaying] = info;
- auto replay_interval =
- pow(2, retry_num) *
config::group_commit_replay_wal_retry_interval_seconds * 1000;
- return UnixMillis() - start_ts >= replay_interval;
+ auto replay_interval = pow(2, wal_info->get_retry_num()) *
+
config::group_commit_replay_wal_retry_interval_seconds * 1000;
+ return UnixMillis() - wal_info->get_start_time_ms() >= replay_interval;
#else
return true;
#endif
}
-Status WalTable::_abort_txn(int64_t db_id, int64_t wal_id) {
+Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
TLoadTxnRollbackRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
@@ -181,47 +202,23 @@ Status WalTable::_abort_txn(int64_t db_id, int64_t
wal_id) {
Status WalTable::_replay_wal_internal(const std::string& wal) {
LOG(INFO) << "Start replay wal for db=" << _db_id << ", table=" <<
_table_id << ", wal=" << wal;
- // start a new stream load
- {
- std::lock_guard<std::mutex> lock(_replay_wal_lock);
- auto it = _replay_wal_map.find(wal);
- if (it != _replay_wal_map.end()) {
- auto& [retry_num, start_time, replaying] = it->second;
- ++retry_num;
- replaying = true;
- } else {
- LOG(WARNING) << "can not find wal in stream load replay map. db="
<< _db_id
- << ", table=" << _table_id << ", wal=" << wal;
- return Status::OK();
- }
- }
std::shared_ptr<std::pair<int64_t, std::string>> pair = nullptr;
- RETURN_IF_ERROR(_get_wal_info(wal, pair));
+ RETURN_IF_ERROR(_parse_wal_path(wal, pair));
auto wal_id = pair->first;
auto label = pair->second;
#ifndef BE_TEST
- auto st = _abort_txn(_db_id, wal_id);
+ auto st = _try_abort_txn(_db_id, wal_id);
if (!st.ok()) {
LOG(WARNING) << "abort txn " << wal_id << " fail";
}
- auto get_st = _get_column_info(_db_id, _table_id);
- if (!get_st.ok()) {
- if (get_st.is<ErrorCode::NOT_FOUND>()) {
- {
- std::lock_guard<std::mutex> lock(_replay_wal_lock);
- _replay_wal_map.erase(wal);
- }
- RETURN_IF_ERROR(_delete_wal(wal_id));
- }
- return get_st;
- }
+ RETURN_IF_ERROR(_get_column_info(_db_id, _table_id));
#endif
- RETURN_IF_ERROR(_send_request(wal_id, wal, label));
+ RETURN_IF_ERROR(_replay_one_txn_with_stremaload(wal_id, wal, label));
return Status::OK();
}
-Status WalTable::_get_wal_info(const std::string& wal,
- std::shared_ptr<std::pair<int64_t,
std::string>>& pair) {
+Status WalTable::_parse_wal_path(const std::string& wal,
+ std::shared_ptr<std::pair<int64_t,
std::string>>& pair) {
std::vector<std::string> path_element;
doris::vectorized::WalReader::string_split(wal, "/", path_element);
auto pos = path_element[path_element.size() - 1].find("_");
@@ -236,171 +233,104 @@ Status WalTable::_get_wal_info(const std::string& wal,
return Status::OK();
}
-void http_request_done(struct evhttp_request* req, void* arg) {
- std::stringstream out;
- std::string status;
- std::string msg;
- std::string wal_id;
- size_t len = 0;
- if (req != nullptr) {
- auto input = evhttp_request_get_input_buffer(req);
- char* request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
- while (request_line != nullptr) {
- std::string s(request_line);
- out << request_line;
- request_line = evbuffer_readln(input, &len, EVBUFFER_EOL_CRLF);
- }
- auto out_str = out.str();
- LOG(INFO) << "replay wal out_str:" << out_str;
- rapidjson::Document doc;
- if (!out_str.empty()) {
- doc.Parse(out.str().c_str());
- status = std::string(doc["Status"].GetString());
- msg = std::string(doc["Message"].GetString());
- LOG(INFO) << "replay wal status:" << status << ",msg:" << msg;
- if (status.find("Fail") != status.npos) {
- if (msg.find("Label") != msg.npos &&
- msg.find("has already been used") != msg.npos) {
- retry = false;
- } else {
- retry = true;
- }
- } else {
- retry = false;
- }
- } else {
- retry = true;
- }
- } else {
- LOG(WARNING) << "req is null";
- }
-
- if (arg != nullptr) {
- event_base_loopbreak((struct event_base*)arg);
- } else {
- LOG(WARNING) << "arg is null";
- }
-}
-
-Status WalTable::_send_request(int64_t wal_id, const std::string& wal, const
std::string& label) {
-#ifndef BE_TEST
- struct event_base* base = nullptr;
- struct evhttp_connection* conn = nullptr;
- struct evhttp_request* req = nullptr;
- retry = false;
- event_init();
- base = event_base_new();
- conn = evhttp_connection_new("127.0.0.1", doris::config::webserver_port);
- evhttp_connection_set_base(conn, base);
- req = evhttp_request_new(http_request_done, base);
- evhttp_add_header(req->output_headers, HTTP_LABEL_KEY.c_str(),
label.c_str());
- evhttp_add_header(req->output_headers, HTTP_AUTH_CODE.c_str(),
std::to_string(wal_id).c_str());
- evhttp_add_header(req->output_headers, HTTP_WAL_ID_KY.c_str(),
std::to_string(wal_id).c_str());
+Status WalTable::_construct_sql_str(const std::string& wal, const std::string&
label,
+ std::string& sql_str, std::vector<size_t>&
index_vector) {
std::string columns;
RETURN_IF_ERROR(_read_wal_header(wal, columns));
std::vector<std::string> column_id_element;
doris::vectorized::WalReader::string_split(columns, ",",
column_id_element);
- std::vector<size_t> index_vector;
std::stringstream ss_name;
std::stringstream ss_id;
int index_raw = 0;
for (auto column_id_str : column_id_element) {
try {
int64_t column_id = std::strtoll(column_id_str.c_str(), NULL, 10);
- auto it = _column_id_name_map.find(column_id);
- auto it2 = _column_id_index_map.find(column_id);
- if (it != _column_id_name_map.end() && it2 !=
_column_id_index_map.end()) {
- ss_name << "`" << it->second << "`,";
- ss_id << "c" <<
std::to_string(_column_id_index_map[column_id]) << ",";
+ auto it = _column_id_info_map.find(column_id);
+ if (it != _column_id_info_map.end()) {
+ ss_name << "`" << it->second->first << "`,";
+ ss_id << "c" << std::to_string(it->second->second) << ",";
index_vector.emplace_back(index_raw);
- _column_id_name_map.erase(column_id);
- _column_id_index_map.erase(column_id);
}
index_raw++;
} catch (const std::invalid_argument& e) {
return Status::InvalidArgument("Invalid format, {}", e.what());
}
}
- _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
auto name = ss_name.str().substr(0, ss_name.str().size() - 1);
auto id = ss_id.str().substr(0, ss_id.str().size() - 1);
std::stringstream ss;
ss << "insert into doris_internal_table_id(" << _table_id << ") WITH LABEL
" << label << " ("
<< name << ") select " << id << " from http_stream(\"format\" =
\"wal\", \"table_id\" = \""
<< std::to_string(_table_id) << "\")";
- evhttp_add_header(req->output_headers, HTTP_SQL.c_str(), ss.str().c_str());
- evbuffer* output = evhttp_request_get_output_buffer(req);
- evbuffer_add_printf(output, "replay wal %s",
std::to_string(wal_id).c_str());
-
- evhttp_make_request(conn, req, EVHTTP_REQ_PUT, "/api/_http_stream");
- evhttp_connection_set_timeout(req->evcon, 300);
-
- event_base_dispatch(base);
- evhttp_connection_free(conn);
- event_base_free(base);
+ sql_str = ss.str().data();
+ return Status::OK();
+}
+Status WalTable::_handle_stream_load(int64_t wal_id, const std::string& wal,
+ const std::string& label) {
+ std::string sql_str;
+ std::vector<size_t> index_vector;
+ RETURN_IF_ERROR(_construct_sql_str(wal, label, sql_str, index_vector));
+ _exec_env->wal_mgr()->add_wal_column_index(wal_id, index_vector);
+ std::shared_ptr<StreamLoadContext> ctx =
std::make_shared<StreamLoadContext>(_exec_env);
+ ctx->sql_str = sql_str;
+ ctx->wal_id = wal_id;
+ ctx->auth.auth_code = wal_id;
+ ctx->label = label;
+ auto st = _http_stream_action->process_put(nullptr, ctx);
+ if (st.ok()) {
+ // wait stream load finish
+ RETURN_IF_ERROR(ctx->future.get());
+ if (ctx->status.ok()) {
+ auto commit_st =
_exec_env->stream_load_executor()->commit_txn(ctx.get());
+ st = commit_st;
+ } else if (!ctx->status.ok()) {
+ LOG(WARNING) << "handle streaming load failed, id=" << ctx->id
+ << ", errmsg=" << ctx->status;
+ _exec_env->stream_load_executor()->rollback_txn(ctx.get());
+ st = ctx->status;
+ }
+ }
+ _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
+ LOG(INFO) << "relay wal id=" << wal_id << ",st=" << st.to_string();
+ return st;
+}
+Status WalTable::_replay_one_txn_with_stremaload(int64_t wal_id, const
std::string& wal,
+ const std::string& label) {
+ bool success = false;
+#ifndef BE_TEST
+ auto st = _handle_stream_load(wal_id, wal, label);
+ auto msg = st.msg();
+ success = st.ok() || st.is<ErrorCode::PUBLISH_TIMEOUT>() ||
+ msg.find("LabelAlreadyUsedException") != msg.npos;
#else
- std::stringstream out;
- out << k_request_line;
- auto out_str = out.str();
- rapidjson::Document doc;
- doc.Parse(out_str.c_str());
- auto status = std::string(doc["Status"].GetString());
- if (status.find("Fail") != status.npos) {
- retry = true;
- } else {
- retry = false;
- }
+ success = k_stream_load_exec_status.ok();
#endif
- if (retry) {
- LOG(INFO) << "fail to replay wal =" << wal;
- std::lock_guard<std::mutex> lock(_replay_wal_lock);
- auto it = _replay_wal_map.find(wal);
- if (it != _replay_wal_map.end()) {
- auto& [retry_num, start_time, replaying] = it->second;
- replaying = false;
- } else {
- _replay_wal_map.emplace(wal, replay_wal_info {0, UnixMillis(),
false});
- }
- } else {
+ if (success) {
LOG(INFO) << "success to replay wal =" << wal;
- RETURN_IF_ERROR(_delete_wal(wal_id));
- std::lock_guard<std::mutex> lock(_replay_wal_lock);
- if (_replay_wal_map.erase(wal)) {
- LOG(INFO) << "erase " << wal << " from _replay_wal_map";
- } else {
- LOG(WARNING) << "fail to erase " << wal << " from _replay_wal_map";
- }
+ } else {
+ LOG(INFO) << "fail to replay wal =" << wal;
+ return Status::InternalError("fail to replay wal =" + wal);
}
- _exec_env->wal_mgr()->erase_wal_column_index(wal_id);
return Status::OK();
}
void WalTable::stop() {
- bool done = true;
do {
{
std::lock_guard<std::mutex> lock(_replay_wal_lock);
if (!this->_stop.load()) {
this->_stop.store(true);
}
- auto it = _replay_wal_map.begin();
- for (; it != _replay_wal_map.end(); it++) {
- auto& [retry_num, start_time, replaying] = it->second;
- if (replaying) {
- break;
- }
- }
- if (it != _replay_wal_map.end()) {
- done = false;
- } else {
- done = true;
+ if (_replay_wal_map.empty() && _replaying_queue.empty()) {
+ break;
}
- }
- if (!done) {
+ LOG(INFO) << "stopping wal_table,wait for relay wal task done, now
"
+ << _replay_wal_map.size() << " wals wait to replay, "
+ << _replaying_queue.size() << " wals are replaying";
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
- } while (!done);
+ } while (true);
}
size_t WalTable::size() {
@@ -429,13 +359,13 @@ Status WalTable::_get_column_info(int64_t db_id, int64_t
tb_id) {
}
std::vector<TColumnInfo> column_element = result.columns;
int64_t column_index = 1;
- _column_id_name_map.clear();
- _column_id_index_map.clear();
+ _column_id_info_map.clear();
for (auto column : column_element) {
- auto column_name = column.columnName;
- auto column_id = column.columnId;
- _column_id_name_map.emplace(column_id, column_name);
- _column_id_index_map.emplace(column_id, column_index);
+ auto column_name = column.column_name;
+ auto column_id = column.column_id;
+ std::shared_ptr<ColumnInfo> column_pair =
+ std::make_shared<ColumnInfo>(std::make_pair(column_name,
column_index));
+ _column_id_info_map.emplace(column_id, column_pair);
column_index++;
}
}
diff --git a/be/src/olap/wal_table.h b/be/src/olap/wal_table.h
index e3d66d577a2..251e8d51a61 100644
--- a/be/src/olap/wal_table.h
+++ b/be/src/olap/wal_table.h
@@ -24,6 +24,8 @@
#include "gen_cpp/FrontendService.h"
#include "gen_cpp/FrontendService_types.h"
#include "gen_cpp/HeartbeatService_types.h"
+#include "http/action/http_stream.h"
+#include "olap/wal_info.h"
#include "runtime/exec_env.h"
#include "runtime/stream_load/stream_load_context.h"
namespace doris {
@@ -32,25 +34,32 @@ public:
WalTable(ExecEnv* exec_env, int64_t db_id, int64_t table_id);
~WalTable();
// used when be start and there are wals need to do recovery
- void add_wals(std::vector<std::string> wals);
+ void add_wal(int64_t wal_id, std::string wal);
+ void pick_relay_wals();
+ Status relay_wal_one_by_one();
Status replay_wals();
size_t size();
void stop();
public:
- // <retry_num, start_time_ms, is_doing_replay>
- using replay_wal_info = std::tuple<int64_t, int64_t, bool>;
+ // <column_name, column_index>
+ using ColumnInfo = std::pair<std::string, int64_t>;
private:
- Status _get_wal_info(const std::string& wal,
std::shared_ptr<std::pair<int64_t, std::string>>&);
- std::string _get_tmp_path(const std::string wal);
- Status _send_request(int64_t wal_id, const std::string& wal, const
std::string& label);
- Status _abort_txn(int64_t db_id, int64_t wal_id);
+ Status _parse_wal_path(const std::string& wal,
+ std::shared_ptr<std::pair<int64_t, std::string>>&);
+ Status _rename_to_tmp_path(const std::string wal);
+ Status _replay_one_txn_with_stremaload(int64_t wal_id, const std::string&
wal,
+ const std::string& label);
+ Status _try_abort_txn(int64_t db_id, int64_t wal_id);
Status _get_column_info(int64_t db_id, int64_t tb_id);
Status _read_wal_header(const std::string& wal, std::string& columns);
- bool _need_replay(const replay_wal_info& info);
+ bool _need_replay(std::shared_ptr<WalInfo>);
Status _replay_wal_internal(const std::string& wal);
Status _delete_wal(int64_t wal_id);
+ Status _construct_sql_str(const std::string& wal, const std::string& label,
+ std::string& sql_str, std::vector<size_t>&
index_vector);
+ Status _handle_stream_load(int64_t wal_id, const std::string& wal, const
std::string& label);
private:
ExecEnv* _exec_env;
@@ -60,9 +69,10 @@ private:
std::string _split = "_";
mutable std::mutex _replay_wal_lock;
// key is wal_id
- std::map<std::string, replay_wal_info> _replay_wal_map;
+ std::map<std::string, std::shared_ptr<WalInfo>> _replay_wal_map;
+ std::list<std::shared_ptr<WalInfo>> _replaying_queue;
std::atomic<bool> _stop;
- std::map<int64_t, std::string> _column_id_name_map;
- std::map<int64_t, int64_t> _column_id_index_map;
+ std::map<int64_t, std::shared_ptr<ColumnInfo>> _column_id_info_map;
+ std::shared_ptr<HttpStreamAction> _http_stream_action;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 28df650f13c..acb40ae6c78 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -51,7 +51,11 @@ Status
LoadBlockQueue::add_block(std::shared_ptr<vectorized::Block> block, bool
if (block->rows() > 0) {
_block_queue.push_back(block);
if (write_wal) {
- RETURN_IF_ERROR(_v_wal_writer->write_wal(block.get()));
+ auto st = _v_wal_writer->write_wal(block.get());
+ if (!st.ok()) {
+ _cancel_without_lock(st);
+ return st;
+ }
}
_all_block_queues_bytes->fetch_add(block->bytes(),
std::memory_order_relaxed);
}
@@ -278,11 +282,17 @@ Status GroupCommitTable::_create_group_commit_load(
_load_block_queues.emplace(instance_id, load_block_queue);
_need_plan_fragment = false;
_exec_env->wal_mgr()->add_wal_status_queue(_table_id, txn_id,
-
WalManager::WAL_STATUS::PREPARE);
+
WalManager::WalStatus::PREPARE);
//create wal
- RETURN_IF_ERROR(
- load_block_queue->create_wal(_db_id, _table_id, txn_id, label,
_exec_env->wal_mgr(),
- params.desc_tbl.slotDescriptors,
be_exe_version));
+ if (!is_pipeline) {
+ RETURN_IF_ERROR(load_block_queue->create_wal(
+ _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
+ params.desc_tbl.slotDescriptors, be_exe_version));
+ } else {
+ RETURN_IF_ERROR(load_block_queue->create_wal(
+ _db_id, _table_id, txn_id, label, _exec_env->wal_mgr(),
+ pipeline_params.desc_tbl.slotDescriptors, be_exe_version));
+ }
_cv.notify_all();
}
st = _exec_plan_fragment(_db_id, _table_id, label, txn_id, is_pipeline,
params,
@@ -367,10 +377,8 @@ Status GroupCommitTable::_finish_group_commit_load(int64_t
db_id, int64_t table_
} else {
std::string wal_path;
RETURN_IF_ERROR(_exec_env->wal_mgr()->get_wal_path(txn_id, wal_path));
- RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id,
-
std::vector<std::string> {wal_path}));
- _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id,
-
WalManager::WAL_STATUS::REPLAY);
+ RETURN_IF_ERROR(_exec_env->wal_mgr()->add_recover_wal(db_id, table_id,
txn_id, wal_path));
+ _exec_env->wal_mgr()->add_wal_status_queue(table_id, txn_id,
WalManager::WalStatus::REPLAY);
}
std::stringstream ss;
ss << "finish group commit, db_id=" << db_id << ", table_id=" << table_id
<< ", label=" << label
diff --git a/be/src/runtime/stream_load/stream_load_context.h
b/be/src/runtime/stream_load/stream_load_context.h
index b530242743e..e57996af9e1 100644
--- a/be/src/runtime/stream_load/stream_load_context.h
+++ b/be/src/runtime/stream_load/stream_load_context.h
@@ -139,6 +139,7 @@ public:
int64_t table_id = -1;
int64_t schema_version = -1;
std::string label;
+ std::string sql_str;
// optional
std::string sub_label;
double max_filter_ratio = 0.0;
diff --git a/be/src/vec/exec/format/wal/wal_reader.cpp
b/be/src/vec/exec/format/wal/wal_reader.cpp
index 01846acc04a..32fd66cc309 100644
--- a/be/src/vec/exec/format/wal/wal_reader.cpp
+++ b/be/src/vec/exec/format/wal/wal_reader.cpp
@@ -88,8 +88,6 @@ void WalReader::string_split(const std::string& str, const
std::string& splits,
Status WalReader::get_columns(std::unordered_map<std::string, TypeDescriptor>*
name_to_type,
std::unordered_set<std::string>* missing_cols) {
RETURN_IF_ERROR(_wal_reader->read_header(_version, _col_ids));
- std::vector<std::string> col_element;
- string_split(_col_ids, ",", col_element);
RETURN_IF_ERROR(_state->exec_env()->wal_mgr()->get_wal_column_index(_wal_id,
_column_index));
return Status::OK();
}
diff --git a/be/src/vec/sink/writer/vwal_writer.cpp
b/be/src/vec/sink/writer/vwal_writer.cpp
index 2dc945a2a2f..2ab37c340a0 100644
--- a/be/src/vec/sink/writer/vwal_writer.cpp
+++ b/be/src/vec/sink/writer/vwal_writer.cpp
@@ -56,7 +56,7 @@ VWalWriter::~VWalWriter() {}
Status VWalWriter::init() {
RETURN_IF_ERROR(_wal_manager->create_wal_writer(_wal_id, _wal_writer));
- _wal_manager->add_wal_status_queue(_tb_id, _wal_id,
WalManager::WAL_STATUS::CREATE);
+ _wal_manager->add_wal_status_queue(_tb_id, _wal_id,
WalManager::WalStatus::CREATE);
std::stringstream ss;
for (auto slot_desc : _slot_descs) {
if (slot_desc.col_unique_id < 0) {
diff --git a/be/test/olap/wal_manager_test.cpp
b/be/test/olap/wal_manager_test.cpp
index 93d8636eb22..c5c216f12a7 100644
--- a/be/test/olap/wal_manager_test.cpp
+++ b/be/test/olap/wal_manager_test.cpp
@@ -44,8 +44,7 @@
namespace doris {
extern TLoadTxnBeginResult k_stream_load_begin_result;
-extern Status k_stream_load_plan_status;
-extern std::string k_request_line;
+extern Status k_stream_load_exec_status;
ExecEnv* _env = nullptr;
std::string wal_dir = std::string(getenv("DORIS_HOME")) + "/wal_test";
@@ -67,7 +66,6 @@ public:
_env->_store_paths = {StorePath(std::filesystem::current_path(), 0)};
_env->_wal_manager = WalManager::create_shared(_env, wal_dir);
k_stream_load_begin_result = TLoadTxnBeginResult();
- k_stream_load_plan_status = Status::OK();
}
void TearDown() override {
static_cast<void>(io::global_local_filesystem()->delete_directory(wal_dir));
@@ -88,7 +86,7 @@ public:
TEST_F(WalManagerTest, recovery_normal) {
_env->wal_mgr()->wal_limit_test_bytes = 1099511627776;
- k_request_line = "{\"Status\": \"Success\", \"Message\": \"Test\"}";
+ k_stream_load_exec_status = Status::OK();
std::string db_id = "1";
int64_t tb_1_id = 1;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
index 0fcd5050fc8..da82c4406c3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java
@@ -1110,7 +1110,7 @@ public class NativeInsertStmt extends InsertStmt {
}
public void analyzeGroupCommit(Analyzer analyzer) throws AnalysisException
{
- if (isGroupCommitStreamLoadSql && (targetTable instanceof OlapTable)
+ if (isGroupCommitStreamLoadSql && targetTable != null && (targetTable
instanceof OlapTable)
&& !((OlapTable)
targetTable).getTableProperty().getUseSchemaLightChange()) {
throw new AnalysisException(
"table light_schema_change is false, can't do http_stream
with group commit mode");
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index d672597a0db..209947c8b78 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -688,16 +688,6 @@ struct TStreamLoadWithLoadStatusResult {
6: optional i64 unselected_rows
}
-struct TCheckWalRequest {
- 1: optional i64 wal_id
- 2: optional i64 db_id
-}
-
-struct TCheckWalResult {
- 1: optional Status.TStatus status
- 2: optional bool need_recovery
-}
-
struct TKafkaRLTaskProgress {
1: required map<i32,i64> partitionCmtOffset
}
@@ -1303,8 +1293,8 @@ struct TGetBackendMetaResult {
}
struct TColumnInfo {
- 1: optional string columnName
- 2: optional i64 columnId
+ 1: optional string column_name
+ 2: optional i64 column_id
}
struct TGetColumnInfoRequest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]