SWJTU-ZhangLei commented on code in PR #38243:
URL: https://github.com/apache/doris/pull/38243#discussion_r1732038003


##########
cloud/src/meta-service/meta_service_txn.cpp:
##########
@@ -862,94 +863,485 @@ void update_tablet_stats(const StatsTabletKeyInfo& info, 
const TabletStats& stat
  */
 void commit_txn_immediately(
         const CommitTxnRequest* request, CommitTxnResponse* response,
-        std::shared_ptr<TxnKv>& txn_kv, MetaServiceCode& code, std::string& 
msg,
-        const std::string& instance_id, int64_t db_id,
+        std::shared_ptr<TxnKv>& txn_kv, std::shared_ptr<TxnLazyCommitter>& 
txn_lazy_committer,
+        MetaServiceCode& code, std::string& msg, const std::string& 
instance_id, int64_t db_id,
         std::vector<std::pair<std::string, doris::RowsetMetaCloudPB>>& 
tmp_rowsets_meta) {
     std::stringstream ss;
     int64_t txn_id = request->txn_id();
-    std::unique_ptr<Transaction> txn;
-    TxnErrorCode err = txn_kv->create_txn(&txn);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = cast_as<ErrCategory::CREATE>(err);
-        ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
+    bool need_advance_last_txn = false;
+    int64_t last_pending_txn_id = 0;
+    do {
+        std::unique_ptr<Transaction> txn;
+        TxnErrorCode err = txn_kv->create_txn(&txn);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::CREATE>(err);
+            ss << "failed to create txn, txn_id=" << txn_id << " err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
 
-    // Get txn info with db_id and txn_id
-    std::string info_val; // Will be reused when saving updated txn
-    const std::string info_key = txn_info_key({instance_id, db_id, txn_id});
-    err = txn->get(info_key, &info_val);
-    if (err != TxnErrorCode::TXN_OK) {
-        code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
-                                                      : 
cast_as<ErrCategory::READ>(err);
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            ss << "transaction [" << txn_id << "] not found, db_id=" << db_id;
-        } else {
-            ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id
-               << " err=" << err;
+        // Get txn info with db_id and txn_id
+        std::string info_val; // Will be reused when saving updated txn
+        const std::string info_key = txn_info_key({instance_id, db_id, 
txn_id});
+        err = txn->get(info_key, &info_val);
+        if (err != TxnErrorCode::TXN_OK) {
+            code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? 
MetaServiceCode::TXN_ID_NOT_FOUND
+                                                          : 
cast_as<ErrCategory::READ>(err);
+            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
+                ss << "transaction [" << txn_id << "] not found, db_id=" << 
db_id;
+            } else {
+                ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" 
<< txn_id
+                   << " err=" << err;
+            }
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
         }
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
 
-    TxnInfoPB txn_info;
-    if (!txn_info.ParseFromString(info_val)) {
-        code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-        ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
+        TxnInfoPB txn_info;
+        if (!txn_info.ParseFromString(info_val)) {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
 
-    // TODO: do more check like txn state, 2PC etc.
-    DCHECK(txn_info.txn_id() == txn_id);
-    if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
-        code = MetaServiceCode::TXN_ALREADY_ABORTED;
-        ss << "transaction [" << txn_id << "] is already aborted, db_id=" << 
db_id;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
+        // TODO: do more check like txn state, 2PC etc.
+        DCHECK(txn_info.txn_id() == txn_id);
+        if (txn_info.status() == TxnStatusPB::TXN_STATUS_ABORTED) {
+            code = MetaServiceCode::TXN_ALREADY_ABORTED;
+            ss << "transaction [" << txn_id << "] is already aborted, db_id=" 
<< db_id;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
 
-    if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
-        if (request->has_is_2pc() && request->is_2pc()) {
-            code = MetaServiceCode::TXN_ALREADY_VISIBLE;
-            ss << "transaction [" << txn_id << "] is already visible, not 
pre-committed.";
+        if (txn_info.status() == TxnStatusPB::TXN_STATUS_VISIBLE) {
+            if (request->has_is_2pc() && request->is_2pc()) {
+                code = MetaServiceCode::TXN_ALREADY_VISIBLE;
+                ss << "transaction [" << txn_id << "] is already visible, not 
pre-committed.";
+                msg = ss.str();
+                LOG(INFO) << msg;
+                response->mutable_txn_info()->CopyFrom(txn_info);
+                return;
+            }
+            code = MetaServiceCode::OK;
+            ss << "transaction is already visible: db_id=" << db_id << " 
txn_id=" << txn_id;
             msg = ss.str();
             LOG(INFO) << msg;
             response->mutable_txn_info()->CopyFrom(txn_info);
             return;
         }
-        code = MetaServiceCode::OK;
-        ss << "transaction is already visible: db_id=" << db_id << " txn_id=" 
<< txn_id;
-        msg = ss.str();
-        LOG(INFO) << msg;
-        response->mutable_txn_info()->CopyFrom(txn_info);
-        return;
-    }
 
-    if (request->has_is_2pc() && request->is_2pc() &&
-        txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
-        code = MetaServiceCode::TXN_INVALID_STATUS;
-        ss << "transaction is prepare, not pre-committed: db_id=" << db_id << 
" txn_id" << txn_id;
-        msg = ss.str();
-        LOG(WARNING) << msg;
-        return;
-    }
+        if (request->has_is_2pc() && request->is_2pc() &&
+            txn_info.status() == TxnStatusPB::TXN_STATUS_PREPARED) {
+            code = MetaServiceCode::TXN_INVALID_STATUS;
+            ss << "transaction is prepare, not pre-committed: db_id=" << db_id 
<< " txn_id"
+               << txn_id;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
+        }
 
-    LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
+        LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
 
-    // Prepare rowset meta and new_versions
+        // Prepare rowset meta and new_versions
+        // Read tablet indexes in batch.
+        std::vector<std::string> tablet_idx_keys;
+        for (auto& [_, i] : tmp_rowsets_meta) {
+            tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
+        }
+        std::vector<std::optional<std::string>> tablet_idx_values;
+        err = txn->batch_get(&tablet_idx_values, tablet_idx_keys,
+                             Transaction::BatchGetOptions(false));
+        if (err != TxnErrorCode::TXN_OK) {
+            code = cast_as<ErrCategory::READ>(err);
+            ss << "failed to get tablet table index ids, err=" << err;
+            msg = ss.str();
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
+            return;
+        }
+
+        size_t total_rowsets = tmp_rowsets_meta.size();
+        // tablet_id -> {table/index/partition}_id
+        std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+        // table_id -> tablets_ids
+        std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+        for (size_t i = 0; i < total_rowsets; i++) {
+            uint64_t tablet_id = tmp_rowsets_meta[i].second.tablet_id();
+            if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+                // The value must existed
+                code = MetaServiceCode::KV_TXN_GET_ERR;
+                ss << "failed to get tablet table index ids, err=not found"
+                   << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
+                msg = ss.str();
+                LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+                return;
+            }
+            if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "malformed tablet index value tablet_id=" << tablet_id

Review Comment:
   > do not change what you don't modify
   
   for pipeline code format check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to