This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7606538484a [fix](group commit) abort txn should use label if replay 
wal failed (#30219)
7606538484a is described below

commit 7606538484a7cd315e019673584e2983f77ed95a
Author: meiyi <[email protected]>
AuthorDate: Mon Jan 22 20:33:00 2024 +0800

    [fix](group commit) abort txn should use label if replay wal failed (#30219)
---
 be/src/olap/wal/wal_manager.cpp                    | 10 ++++----
 be/src/olap/wal/wal_table.cpp                      | 29 +++++++++++-----------
 be/src/olap/wal/wal_table.h                        |  2 +-
 be/src/runtime/group_commit_mgr.cpp                | 12 ++++-----
 .../apache/doris/service/FrontendServiceImpl.java  | 24 ++++++++++++------
 gensrc/thrift/FrontendService.thrift               |  1 +
 6 files changed, 43 insertions(+), 35 deletions(-)

diff --git a/be/src/olap/wal/wal_manager.cpp b/be/src/olap/wal/wal_manager.cpp
index 621a1aa0806..45a9b58ff2d 100644
--- a/be/src/olap/wal/wal_manager.cpp
+++ b/be/src/olap/wal/wal_manager.cpp
@@ -198,9 +198,9 @@ size_t WalManager::get_wal_queue_size(int64_t table_id) {
             return 0;
         }
     } else {
-        //table_id is -1 meaning get all table wal size
-        for (auto it = _wal_queues.begin(); it != _wal_queues.end(); it++) {
-            count += it->second.size();
+        // table_id is -1 meaning get all table wal size
+        for (auto& [_, table_wals] : _wal_queues) {
+            count += table_wals.size();
         }
     }
     return count;
@@ -372,8 +372,8 @@ size_t WalManager::get_wal_table_size(int64_t table_id) {
 
 void WalManager::_stop_relay_wal() {
     std::lock_guard<std::shared_mutex> wrlock(_table_lock);
-    for (auto it = _table_map.begin(); it != _table_map.end(); it++) {
-        it->second->stop();
+    for (auto& [_, wal_table] : _table_map) {
+        wal_table->stop();
     }
 }
 
diff --git a/be/src/olap/wal/wal_table.cpp b/be/src/olap/wal/wal_table.cpp
index a6d7a4054c8..8adc9cf5f50 100644
--- a/be/src/olap/wal/wal_table.cpp
+++ b/be/src/olap/wal/wal_table.cpp
@@ -58,28 +58,27 @@ void WalTable::_pick_relay_wals() {
     std::lock_guard<std::mutex> lock(_replay_wal_lock);
     std::vector<std::string> need_replay_wals;
     std::vector<std::string> need_erase_wals;
-    for (auto it = _replay_wal_map.begin(); it != _replay_wal_map.end(); it++) 
{
-        auto wal_info = it->second;
+    for (const auto& [wal_path, wal_info] : _replay_wal_map) {
         if (wal_info->get_retry_num() >= 
config::group_commit_replay_wal_retry_num) {
             LOG(WARNING) << "All replay wal failed, db=" << _db_id << ", 
table=" << _table_id
-                         << ", wal=" << it->first << ", retry_num=" << 
wal_info->get_retry_num();
-            auto st = _exec_env->wal_mgr()->rename_to_tmp_path(it->first, 
_table_id,
+                         << ", wal=" << wal_path << ", retry_num=" << 
wal_info->get_retry_num();
+            auto st = _exec_env->wal_mgr()->rename_to_tmp_path(wal_path, 
_table_id,
                                                                
wal_info->get_wal_id());
             if (!st.ok()) {
-                LOG(WARNING) << "rename " << it->first << " fail"
+                LOG(WARNING) << "rename " << wal_path << " fail"
                              << ",st:" << st.to_string();
             }
             if (config::group_commit_wait_replay_wal_finish) {
-                auto notify_st = 
_exec_env->wal_mgr()->notify_relay_wal(it->second->get_wal_id());
+                auto notify_st = 
_exec_env->wal_mgr()->notify_relay_wal(wal_info->get_wal_id());
                 if (!notify_st.ok()) {
-                    LOG(WARNING) << "notify wal " << it->second->get_wal_id() 
<< " fail";
+                    LOG(WARNING) << "notify wal " << wal_info->get_wal_id() << 
" fail";
                 }
             }
-            need_erase_wals.push_back(it->first);
+            need_erase_wals.push_back(wal_path);
             continue;
         }
         if (_need_replay(wal_info)) {
-            need_replay_wals.push_back(it->first);
+            need_replay_wals.push_back(wal_path);
         }
     }
     for (const auto& wal : need_erase_wals) {
@@ -168,13 +167,13 @@ bool WalTable::_need_replay(std::shared_ptr<WalInfo> 
wal_info) {
 #endif
 }
 
-Status WalTable::_try_abort_txn(int64_t db_id, int64_t wal_id) {
+Status WalTable::_try_abort_txn(int64_t db_id, std::string& label) {
     TLoadTxnRollbackRequest request;
     request.__set_auth_code(0); // this is a fake, fe not check it now
     request.__set_db_id(db_id);
     // TODO should we use label, because the replay wal use the same label and 
different wal_id
-    request.__set_txnId(wal_id);
-    std::string reason = "relay wal " + std::to_string(wal_id);
+    request.__set_label(label);
+    std::string reason = "relay wal with label " + label;
     request.__set_reason(reason);
     TLoadTxnRollbackResult result;
     TNetworkAddress master_addr = _exec_env->master_info()->network_address;
@@ -185,7 +184,7 @@ Status WalTable::_try_abort_txn(int64_t db_id, int64_t 
wal_id) {
             },
             10000L);
     auto result_status = Status::create(result.status);
-    LOG(INFO) << "abort txn " << wal_id << ",st:" << st << ",result_status:" 
<< result_status;
+    LOG(INFO) << "abort label " << label << ", st:" << st << ", 
result_status:" << result_status;
     return result_status;
 }
 
@@ -196,9 +195,9 @@ Status WalTable::_replay_wal_internal(const std::string& 
wal) {
     RETURN_IF_ERROR(_parse_wal_path(wal, wal_id, label));
 #ifndef BE_TEST
     if (!config::group_commit_wait_replay_wal_finish) {
-        auto st = _try_abort_txn(_db_id, wal_id);
+        auto st = _try_abort_txn(_db_id, label);
         if (!st.ok()) {
-            LOG(WARNING) << "abort txn " << wal_id << " fail";
+            LOG(WARNING) << "failed to abort txn with label " << label;
         }
     }
 #endif
diff --git a/be/src/olap/wal/wal_table.h b/be/src/olap/wal/wal_table.h
index 07287d8f7e6..9b1ead87a20 100644
--- a/be/src/olap/wal/wal_table.h
+++ b/be/src/olap/wal/wal_table.h
@@ -47,7 +47,7 @@ private:
 
     Status _replay_wal_internal(const std::string& wal);
     Status _parse_wal_path(const std::string& wal, int64_t& wal_id, 
std::string& label);
-    Status _try_abort_txn(int64_t db_id, int64_t wal_id);
+    Status _try_abort_txn(int64_t db_id, std::string& label);
     Status _get_column_info(int64_t db_id, int64_t tb_id,
                             std::map<int64_t, std::string>& column_info_map);
 
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index 3931306cd6f..d057dd92b92 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -191,14 +191,14 @@ Status GroupCommitTable::get_first_block_load_queue(
         std::unique_lock l(_lock);
         for (int i = 0; i < 3; i++) {
             bool is_schema_version_match = true;
-            for (auto it = _load_block_queues.begin(); it != 
_load_block_queues.end(); ++it) {
-                if (!it->second->need_commit()) {
-                    if (base_schema_version == it->second->schema_version) {
-                        if (it->second->add_load_id(load_id).ok()) {
-                            load_block_queue = it->second;
+            for (const auto& [_, inner_block_queue] : _load_block_queues) {
+                if (!inner_block_queue->need_commit()) {
+                    if (base_schema_version == 
inner_block_queue->schema_version) {
+                        if (inner_block_queue->add_load_id(load_id).ok()) {
+                            load_block_queue = inner_block_queue;
                             return Status::OK();
                         }
-                    } else if (base_schema_version < 
it->second->schema_version) {
+                    } else if (base_schema_version < 
inner_block_queue->schema_version) {
                         is_schema_version_match = false;
                     }
                 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java 
b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
index f9e3372bf28..5b3cfb25ac9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java
@@ -1663,15 +1663,23 @@ public class FrontendServiceImpl implements 
FrontendService.Iface {
             throw new MetaNotFoundException("db " + request.getDb() + " does 
not exist");
         }
         long dbId = db.getId();
-        TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
-                .getTransactionState(dbId, request.getTxnId());
-        if (transactionState == null) {
-            throw new UserException("transaction [" + request.getTxnId() + "] 
not found");
+        if (request.getTxnId() != 0) { // txnId is required in thrift
+            TransactionState transactionState = 
Env.getCurrentGlobalTransactionMgr()
+                    .getTransactionState(dbId, request.getTxnId());
+            if (transactionState == null) {
+                throw new UserException("transaction [" + request.getTxnId() + 
"] not found");
+            }
+            List<Table> tableList = 
db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
+            Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
request.getTxnId(),
+                    request.isSetReason() ? request.getReason() : "system 
cancel",
+                    
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
+        } else if (request.isSetLabel()) {
+            Env.getCurrentGlobalTransactionMgr()
+                    .abortTransaction(db.getId(), request.getLabel(),
+                            request.isSetReason() ? request.getReason() : 
"system cancel");
+        } else {
+            throw new UserException("must set txn_id or label");
         }
-        List<Table> tableList = 
db.getTablesOnIdOrderIfExist(transactionState.getTableIdList());
-        Env.getCurrentGlobalTransactionMgr().abortTransaction(dbId, 
request.getTxnId(),
-                request.isSetReason() ? request.getReason() : "system cancel",
-                
TxnCommitAttachment.fromThrift(request.getTxnCommitAttachment()), tableList);
     }
 
     @Override
diff --git a/gensrc/thrift/FrontendService.thrift 
b/gensrc/thrift/FrontendService.thrift
index 9868171109e..7b65103c581 100644
--- a/gensrc/thrift/FrontendService.thrift
+++ b/gensrc/thrift/FrontendService.thrift
@@ -835,6 +835,7 @@ struct TLoadTxnRollbackRequest {
     12: optional i64 db_id
     13: optional list<string> tbls
     14: optional string auth_code_uuid
+    15: optional string label
 }
 
 struct TLoadTxnRollbackResult {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to