This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7606538484a [fix](group commit) abort txn should use label if replay
wal failed (#30219)
7606538484a is described below
commit 7606538484a7cd315e019673584e2983f77ed95a
Author: meiyi <[email protected]>
AuthorDate: Mon Jan 22 20:33:00 2024 +0800
[fix](group commit) abort txn should use label if replay wal failed (#30219)
---
be/src/olap/wal/wal_manager.cpp | 10 ++++----
be/src/olap/wal/wal_table.cpp | 29 +++++++++++-----------
be/src/olap/wal/wal_table.h | 2 +-
be/src/runtime/group_commit_mgr.cpp | 12 ++++-----
.../apache/doris/service/FrontendServiceImpl.java | 24 ++++++++++++------
gensrc/thrift/FrontendService.thrift | 1 +
6 files changed, 43 insertions(+), 35 deletions(-)
diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 621a1aa0806..45a9b58ff2d 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -198,9 +198,9 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) {
return 0;
}
} else {
- //table_id is -1 meaning get all table wal size
- for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) {
- count += it->second.size();
+ // table_id is -1 meaning get all table wal size
+ for (auto& [_, table_wals] : _wal_queues) {
+ count += table_wals.size();
}
}
return count;
@@ -372,8 +372,8 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
void WalManager::_stop_relay_wal() {
std::lock_guard<std::shared_mutex> wrlock(_table_lock);
- for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
- it->second->stop();
+ for (auto& [_, wal_table] : _table_map) {
+ wal_table->stop();
}
}
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index a6d7a4054c8..8adc9cf5f50 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -58,28 +58,27 @@ void WalTable::_pick_relay_wals() {
std::lock_guard<std::mutex> lock(_replay_wal_lock);
std::vector<std::string> need_replay_wals;
std::vector<std::string> need_erase_wals;
- for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++)
{
- auto wal_info = it->second;
+ for (const auto& [wal_path, wal_info] : _replay_wal_map) {
if (wal_info->get_retry_num() >=
config::group_commit_replay_wal_retry_num) {
LOG(WARNING) << "All replay wal failed, db=" << _db_id << ",
table=" << _table_id
- << ", wal=" << it->first << ", retry_num=" <<
wal_info->get_retry_num();
- auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first,
_table_id,
+ << ", wal=" << wal_path << ", retry_num=" <<
wal_info->get_retry_num();
+ auto st = _exec_env->wal_mgr()->rename_to_tmp_path(wal_path,
_table_id,
wal_info->get_wal_id());
if (!st.ok()) {
- LOG(WARNING) << "rename " << it->first << " fail"
+ LOG(WARNING) << "rename " << wal_path << " fail"
<< ",st:" << st.to_string();
}
if (config::group_commit_wait_replay_wal_finish) {
- auto notify_st =
_exec_env->wal_mgr()->notify_relay_wal(it->second->get_wal_id());
+ auto notify_st =
_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
if (!notify_st.ok()) {
- LOG(WARNING) << "notify wal " << it->second->get_wal_id()
<< " fail";
+ LOG(WARNING) << "notify wal " << wal_info->get_wal_id() <<
" fail";
}
}
- need_erase_wals.push_back(it->first);
+ need_erase_wals.push_back(wal_path);
continue;
}
if (_need_replay(wal_info)) {
- need_replay_wals.push_back(it->first);
+ need_replay_wals.push_back(wal_path);
}
}
for (const auto& wal : need_erase_wals) {
@@ -168,13 +167,13 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo>
wal_info) {
#endif
}
-Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
+Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
TLoadTxnRollbackRequest request;
request.__set_auth_code(0); // this is a fake, fe not check it now
request.__set_db_id(db_id);
// TODO should we use label, because the replay wal use the same label and
different wal_id
- request.__set_txnId(wal_id);
- std::string reason = "relay wal " + std::to_string(wal_id);
+ request.__set_label(label);
+ std::string reason = "relay wal with label " + label;
request.__set_reason(reason);
TLoadTxnRollbackResult result;
TNetworkAddress master_addr = _exec_env->master_info()->network_address;
@@ -185,7 +184,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t
wal_id) {
},
10000L);
auto result_status = Status::create(result.status);
- LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:"
<< result_status;
+ LOG(INFO) << "abort label " << label << ", st:" << st << ",
result_status:" << result_status;
return result_status;
}
@@ -196,9 +195,9 @@ Status WalTable::_replay_wal_internal(const std::string&
wal) {
RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
#ifndef BE_TEST
if (!config::group_commit_wait_replay_wal_finish) {
- auto st = _try_abort_txn(_db_id, wal_id);
+ auto st = _try_abort_txn(_db_id, label);
if (!st.ok()) {
- LOG(WARNING) << "abort txn " << wal_id << " fail";
+ LOG(WARNING) << "failed to abort txn with label " << label;
}
}
#endif
diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h
index 07287d8f7e6..9b1ead87a20 100644
--- a/be/src/olap/wal/wal_table.h
+++ b/be/src/olap/wal/wal_table.h
@@ -47,7 +47,7 @@ private:
Status _replay_wal_internal(const std::string& wal);
Status _parse_wal_path(const std::string& wal, int64_t& wal_id,
std::string& label);
- Status _try_abort_txn(int64_t db_id, int64_t wal_id);
+ Status _try_abort_txn(int64_t db_id, std::string& label);
Status _get_column_info(int64_t db_id, int64_t tb_id,
std::map<int64_t, std::string>& column_info_map);
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index 3931306cd6f..d057dd92b92 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -191,14 +191,14 @@ Status GroupCommitTable::get_first_block_load_queue(
std::unique_lock l(_lock);
for (int i = 0; i < 3; i++) {
bool is_schema_version_match = true;
- for (auto it = _load_block_queues.begin(); it !=
_load_block_queues.end(); ++it) {
- if (!it->second->need_commit()) {
- if (base_schema_version == it->second->schema_version) {
- if (it->second->add_load_id(load_id).ok()) {
- load_block_queue = it->second;
+ for (const auto& [_, inner_block_queue] : _load_block_queues) {
+ if (!inner_block_queue->need_commit()) {
+ if (base_schema_version ==
inner_block_queue->schema_version) {
+ if (inner_block_queue->add_load_id(load_id).ok()) {
+ load_block_queue = inner_block_queue;
return Status::OK();
}
- } else if (base_schema_version <
it->second->schema_version) {
+ } else if (base_schema_version <
inner_block_queue->schema_version) {
is_schema_version_match = false;
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f9e3372bf28..5b3cfb25ac9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1663,15 +1663,23 @@ public class FrontendServiceImpl implements
FrontendService.Iface {
throw new MetaNotFoundException("db " + request.getDb() + " does
not exist");
}
long dbId = db.getId();
- TransactionState transactionState =
Env.getCurrentGlobalTransactionMgr()
- .getTransactionState(dbId, request.getTxnId());
- if (transactionState == null) {
- throw new UserException("transaction [" + request.getTxnId() + "]
not found");
+ if (request.getTxnId() != 0) { // txnId is required in thrift
+ TransactionState transactionState =
Env.getCurrentGlobalTransactionMgr()
+ .getTransactionState(dbId, request.getTxnId());
+ if (transactionState == null) {
+ throw new UserException("transaction [" + request.getTxnId() +
"] not found");
+ }
+ List<Table> tableList =
db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
+ Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId,
request.getTxnId(),
+ request.isSetReason() ? request.getReason() : "system
cancel",
+
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
+ } else if (request.isSetLabel()) {
+ Env.getCurrentGlobalTransactionMgr()
+ .abortTransaction(db.getId(), request.getLabel(),
+ request.isSetReason() ? request.getReason() :
"system cancel");
+ } else {
+ throw new UserException("must set txn_id or label");
}
- List<Table> tableList =
db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
- Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId,
request.getTxnId(),
- request.isSetReason() ? request.getReason() : "system cancel",
-
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
}
@Override
diff --git a/gensrc/thrift/FrontendService.thrift
b/gensrc/thrift/FrontendService.thrift
index 9868171109e..7b65103c581 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -835,6 +835,7 @@ struct TLoadTxnRollbackRequest {
12: optional i64 db_id
13: optional list<string> tbls
14: optional string auth_code_uuid
+ 15: optional string label
}
struct TLoadTxnRollbackResult {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]