This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-4.0-preview
in repository https://gitbox.apache.org/repos/asf/doris.git

commit eec97ae352badb83e9fdaa7ea5fa285fe2ca5522
Author: walter <[email protected]>
AuthorDate: Fri Apr 19 09:42:41 2024 +0800

    [imporve](cloud) Get key values in batch during txn commit (#33771)
---
 cloud/src/meta-service/meta_service_txn.cpp | 237 ++++++++++++++++++----------
 cloud/src/meta-service/txn_kv.cpp           |  87 +++++-----
 cloud/src/meta-service/txn_kv.h             |  14 +-
 3 files changed, 208 insertions(+), 130 deletions(-)

diff --git a/cloud/src/meta-service/meta_service_txn.cpp 
b/cloud/src/meta-service/meta_service_txn.cpp
index 41a1e6c4dfe..139206ede20 100644
--- a/cloud/src/meta-service/meta_service_txn.cpp
+++ b/cloud/src/meta-service/meta_service_txn.cpp
@@ -48,19 +48,19 @@ static void get_pb_from_tablestats(TableStats& stats, 
TableStatsPB* stats_pb) {
     stats_pb->set_updated_row_count(stats.updated_row_count);
 }
 
-static void calc_table_stats(std::map<int64_t, TabletIndexPB>& tablet_ids,
-                             std::map<int64_t, TabletStats>& tablet_stats,
+static void calc_table_stats(std::unordered_map<int64_t, TabletIndexPB>& 
tablet_ids,
+                             std::unordered_map<int64_t, TabletStats>& 
tablet_stats,
                              std::map<int64_t, TableStats>& table_stats,
                              std::vector<int64_t> base_tablet_ids) {
     int64_t table_id;
 
     VLOG_DEBUG << "base_tablet_ids size: " << base_tablet_ids.size();
-    for (auto& [tablet_id, tablet_stat] : tablet_stats) {
-        // if tablet_id not in base_tablet_ids then skip it
-        if (std::find(base_tablet_ids.begin(), base_tablet_ids.end(), 
tablet_id) ==
-            base_tablet_ids.end()) {
+    for (int64_t tablet_id : base_tablet_ids) {
+        auto it = tablet_stats.find(tablet_id);
+        if (it == tablet_stats.end()) {
             continue;
         }
+        const auto& tablet_stat = it->second;
         table_id = tablet_ids[tablet_id].table_id();
         if (table_stats.find(table_id) == table_stats.end()) {
             table_stats[table_id] = TableStats(tablet_stat.num_rows);
@@ -689,6 +689,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = cast_as<ErrCategory::CREATE>(err);
         ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -700,6 +701,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = cast_as<ErrCategory::READ>(err);
         ss << "failed to get db id, txn_id=" << txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -708,6 +710,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = MetaServiceCode::PROTOBUF_PARSE_ERR;
         ss << "failed to parse txn_index_pb, txn_id=" << txn_id;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -737,12 +740,19 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
 
     std::unique_ptr<RangeGetIterator> it;
     do {
-        err = txn->get(rs_tmp_key0, rs_tmp_key1, &it);
+        err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+        if (err == TxnErrorCode::TXN_TOO_OLD) {
+            err = txn_kv_->create_txn(&txn);
+            if (err == TxnErrorCode::TXN_OK) {
+                err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true);
+            }
+        }
         if (err != TxnErrorCode::TXN_OK) {
             code = cast_as<ErrCategory::READ>(err);
             ss << "internal error, failed to get tmp rowset while committing, 
txn_id=" << txn_id
                << " err=" << err;
             msg = ss.str();
+            LOG(WARNING) << msg;
             return;
         }
 
@@ -752,10 +762,10 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
             tmp_rowsets_meta.emplace_back();
             if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), 
v.size())) {
                 code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "malformed rowset meta, unable to initialize, txn_id=" 
<< txn_id;
+                ss << "malformed rowset meta, unable to initialize, txn_id=" 
<< txn_id
+                   << " key=" << hex(k);
                 msg = ss.str();
-                ss << " key=" << hex(k);
-                LOG(WARNING) << ss.str();
+                LOG(WARNING) << msg;
                 return;
             }
             // Save keys that will be removed later
@@ -775,6 +785,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = cast_as<ErrCategory::CREATE>(err);
         ss << "filed to create txn, txn_id=" << txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -791,6 +802,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
                                                       : 
cast_as<ErrCategory::READ>(err);
         ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << 
txn_id << " err=" << err;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -799,6 +811,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = MetaServiceCode::PROTOBUF_PARSE_ERR;
         ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << 
txn_id;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -808,6 +821,7 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = MetaServiceCode::TXN_ALREADY_ABORTED;
         ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" 
<< txn_id;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
@@ -830,86 +844,123 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
         code = MetaServiceCode::TXN_INVALID_STATUS;
         ss << "transaction is prepare, not pre-committed: db_id=" << db_id << 
" txn_id" << txn_id;
         msg = ss.str();
+        LOG(WARNING) << msg;
         return;
     }
 
     LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << 
txn_info.ShortDebugString();
 
     // Prepare rowset meta and new_versions
-    std::vector<std::pair<std::string, std::string>> rowsets;
-    std::map<std::string, uint64_t> new_versions;
-    std::map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats
-    std::map<int64_t, TabletIndexPB> tablet_ids; // tablet_id -> 
{table/index/partition}_id
-    std::map<int64_t, std::vector<int64_t>> table_id_tablet_ids; // table_id 
-> tablets_ids
-    rowsets.reserve(tmp_rowsets_meta.size());
+    // Read tablet indexes in batch.
+    std::vector<std::string> tablet_idx_keys;
     for (auto& [_, i] : tmp_rowsets_meta) {
-        int64_t tablet_id = i.tablet_id();
-        // Get version for the rowset
-        if (tablet_ids.count(tablet_id) == 0) {
-            MetaTabletIdxKeyInfo key_info {instance_id, tablet_id};
-            auto [key, val] = std::make_tuple(std::string(""), 
std::string(""));
-            meta_tablet_idx_key(key_info, &key);
-            TxnErrorCode err = txn->get(key, &val);
-            if (err != TxnErrorCode::TXN_OK) { // Must be TXN_OK, an existing 
value
-                code = cast_as<ErrCategory::READ>(err);
-                ss << "failed to get tablet table index ids,"
-                   << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " not found" : 
" internal error")
-                   << " tablet_id=" << tablet_id << " key=" << hex(key);
-                msg = ss.str();
-                LOG(INFO) << msg << " err=" << err << " txn_id=" << txn_id;
-                return;
-            }
-            if (!tablet_ids[tablet_id].ParseFromString(val)) {
-                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                ss << "malformed tablet index value tablet_id=" << tablet_id
-                   << " txn_id=" << txn_id;
-                msg = ss.str();
-                return;
-            }
-            
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
-            VLOG_DEBUG << "tablet_id:" << tablet_id
-                       << " value:" << 
tablet_ids[tablet_id].ShortDebugString();
+        tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, 
i.tablet_id()}));
+    }
+    std::vector<std::optional<std::string>> tablet_idx_values;
+    err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, 
Transaction::BatchGetOptions(false));
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get tablet table index ids, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+
+    size_t total_rowsets = tmp_rowsets_meta.size();
+    // tablet_id -> {table/index/partition}_id
+    std::unordered_map<int64_t, TabletIndexPB> tablet_ids;
+    // table_id -> tablets_ids
+    std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids;
+    for (size_t i = 0; i < total_rowsets; i++) {
+        uint64_t tablet_id = tmp_rowsets_meta[i].second.tablet_id();
+        if (!tablet_idx_values[i].has_value()) [[unlikely]] {
+            // The value must existed
+            code = MetaServiceCode::KV_TXN_GET_ERR;
+            ss << "failed to get tablet table index ids, err=not found"
+               << " tablet_id=" << tablet_id << " key=" << 
hex(tablet_idx_keys[i]);
+            msg = ss.str();
+            LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id;
+            return;
+        }
+        if 
(!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) 
[[unlikely]] {
+            code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+            ss << "malformed tablet index value tablet_id=" << tablet_id << " 
txn_id=" << txn_id;
+            msg = ss.str();
+            LOG(WARNING) << msg;
+            return;
         }
+        
table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id);
+        VLOG_DEBUG << "tablet_id:" << tablet_id
+                   << " value:" << tablet_ids[tablet_id].ShortDebugString();
+    }
+
+    tablet_idx_keys.clear();
+    tablet_idx_values.clear();
 
+    // {table/partition} -> version
+    std::unordered_map<std::string, uint64_t> new_versions;
+    std::vector<std::string> version_keys;
+    for (auto& [_, i] : tmp_rowsets_meta) {
+        int64_t tablet_id = i.tablet_id();
         int64_t table_id = tablet_ids[tablet_id].table_id();
         int64_t partition_id = i.partition_id();
-
         std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
-        int64_t version = -1;
-        std::string ver_val_str;
-        int64_t new_version = -1;
-        VersionPB version_pb;
         if (new_versions.count(ver_key) == 0) {
-            err = txn->get(ver_key, &ver_val_str);
-            if (err != TxnErrorCode::TXN_OK && err != 
TxnErrorCode::TXN_KEY_NOT_FOUND) {
-                code = cast_as<ErrCategory::READ>(err);
-                ss << "failed to get version, table_id=" << table_id
-                   << "partition_id=" << partition_id << " key=" << 
hex(ver_key);
+            new_versions.insert({ver_key, 0});
+            version_keys.push_back(std::move(ver_key));
+        }
+    }
+    std::vector<std::optional<std::string>> version_values;
+    err = txn->batch_get(&version_values, version_keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        code = cast_as<ErrCategory::READ>(err);
+        ss << "failed to get partition versions, err=" << err;
+        msg = ss.str();
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+    size_t total_versions = version_keys.size();
+    for (size_t i = 0; i < total_versions; i++) {
+        int64_t version;
+        if (version_values[i].has_value()) {
+            VersionPB version_pb;
+            if (!version_pb.ParseFromString(version_values[i].value())) {
+                code = MetaServiceCode::PROTOBUF_PARSE_ERR;
+                ss << "failed to parse version pb txn_id=" << txn_id
+                   << " key=" << hex(version_keys[i]);
                 msg = ss.str();
-                LOG(INFO) << msg << " txn_id=" << txn_id;
                 return;
             }
-
-            if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-                // Maybe first version
-                version = 1;
-            } else {
-                if (!version_pb.ParseFromString(ver_val_str)) {
-                    code = MetaServiceCode::PROTOBUF_PARSE_ERR;
-                    ss << "failed to parse ver_val_str"
-                       << " txn_id=" << txn_id << " key=" << hex(ver_key);
-                    msg = ss.str();
-                    return;
-                }
-                version = version_pb.version();
-            }
-            new_version = version + 1;
-            new_versions.insert({std::move(ver_key), new_version});
+            version = version_pb.version();
         } else {
-            new_version = new_versions[ver_key];
+            version = 1;
+        }
+        new_versions[version_keys[i]] = version + 1;
+    }
+    version_keys.clear();
+    version_values.clear();
+
+    std::vector<std::pair<std::string, std::string>> rowsets;
+    std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> 
stats
+    rowsets.reserve(tmp_rowsets_meta.size());
+    for (auto& [_, i] : tmp_rowsets_meta) {
+        int64_t tablet_id = i.tablet_id();
+        int64_t table_id = tablet_ids[tablet_id].table_id();
+        int64_t partition_id = i.partition_id();
+        std::string ver_key = partition_version_key({instance_id, db_id, 
table_id, partition_id});
+        if (new_versions[ver_key] == 0) [[unlikely]] {
+            // it is impossible.
+            code = MetaServiceCode::UNDEFINED_ERR;
+            ss << "failed to get partition version key, the target version not 
exists in "
+                  "new_versions."
+               << " txn_id=" << txn_id;
+            msg = ss.str();
+            LOG(ERROR) << msg;
+            return;
         }
 
         // Update rowset version
+        int64_t new_version = new_versions[ver_key];
         i.set_start_version(new_version);
         i.set_end_version(new_version);
 
@@ -932,30 +983,40 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
     } // for tmp_rowsets_meta
 
     // process mow table, check lock and remove pending key
+    std::vector<std::string> lock_keys;
+    lock_keys.reserve(request->mow_table_ids().size());
     for (auto table_id : request->mow_table_ids()) {
-        std::string lock_key = 
meta_delete_bitmap_update_lock_key({instance_id, table_id, -1});
-        std::string lock_val;
-        err = txn->get(lock_key, &lock_val);
-        LOG(INFO) << "get delete bitmap update lock info, table_id=" << 
table_id
-                  << " key=" << hex(lock_key) << " err=" << err;
+        lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, 
table_id, -1}));
+    }
+    std::vector<std::optional<std::string>> lock_values;
+    err = txn->batch_get(&lock_values, lock_keys);
+    if (err != TxnErrorCode::TXN_OK) {
+        ss << "failed to get delete bitmap update lock key info, instance_id=" 
<< instance_id
+           << " err=" << err;
+        msg = ss.str();
+        code = cast_as<ErrCategory::READ>(err);
+        LOG(WARNING) << msg << " txn_id=" << txn_id;
+        return;
+    }
+    size_t total_locks = lock_keys.size();
+    for (size_t i = 0; i < total_locks; i++) {
+        int64_t table_id = request->mow_table_ids(i);
         // When the key does not exist, it means the lock has been acquired
         // by another transaction and successfully committed.
-        if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) {
-            msg = "lock is expired";
+        if (!lock_values[i].has_value()) {
+            ss << "get delete bitmap update lock info, lock is expired"
+               << " table_id=" << table_id << " key=" << hex(lock_keys[i]);
             code = MetaServiceCode::LOCK_EXPIRED;
-            return;
-        }
-        if (err != TxnErrorCode::TXN_OK) {
-            ss << "failed to get delete bitmap update lock key info, 
instance_id=" << instance_id
-               << " table_id=" << table_id << " key=" << hex(lock_key) << " 
err=" << err;
             msg = ss.str();
-            code = cast_as<ErrCategory::READ>(err);
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
             return;
         }
+
         DeleteBitmapUpdateLockPB lock_info;
-        if (!lock_info.ParseFromString(lock_val)) [[unlikely]] {
+        if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] {
             code = MetaServiceCode::PROTOBUF_PARSE_ERR;
             msg = "failed to parse DeleteBitmapUpdateLockPB";
+            LOG(WARNING) << msg << " txn_id=" << txn_id;
             return;
         }
         if (lock_info.lock_id() != request->txn_id()) {
@@ -963,8 +1024,8 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
             code = MetaServiceCode::LOCK_EXPIRED;
             return;
         }
-        txn->remove(lock_key);
-        LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << 
hex(lock_key)
+        txn->remove(lock_keys[i]);
+        LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << 
hex(lock_keys[i])
                   << " txn_id=" << txn_id;
 
         for (auto tablet_id : table_id_tablet_ids[table_id]) {
@@ -974,6 +1035,8 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
                       << " txn_id=" << txn_id;
         }
     }
+    lock_keys.clear();
+    lock_values.clear();
 
     // Save rowset meta
     num_put_keys += rowsets.size();
@@ -1004,8 +1067,8 @@ void 
MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller,
                   << " txn_id=" << txn_id;
 
         std::string_view ver_key = i.first;
-        //PartitionVersionKeyInfo  {instance_id, db_id, table_id, partition_id}
         ver_key.remove_prefix(1); // Remove key space
+        // PartitionVersionKeyInfo  {instance_id, db_id, table_id, 
partition_id}
         std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> 
out;
         int ret = decode_key(&ver_key, &out);
         if (ret != 0) [[unlikely]] {
diff --git a/cloud/src/meta-service/txn_kv.cpp 
b/cloud/src/meta-service/txn_kv.cpp
index 54cec231f6f..5afe0edb409 100644
--- a/cloud/src/meta-service/txn_kv.cpp
+++ b/cloud/src/meta-service/txn_kv.cpp
@@ -23,10 +23,8 @@
 #include <algorithm>
 #include <atomic>
 #include <cstring>
-#include <iomanip>
 #include <memory>
 #include <optional>
-#include <sstream>
 #include <string_view>
 #include <thread>
 #include <vector>
@@ -491,53 +489,62 @@ TxnErrorCode RangeGetIterator::init() {
 TxnErrorCode Transaction::batch_get(std::vector<std::optional<std::string>>* 
res,
                                     const std::vector<std::string>& keys,
                                     const BatchGetOptions& opts) {
+    struct FDBFutureDelete {
+        void operator()(FDBFuture* future) { fdb_future_destroy(future); }
+    };
+
+    res->clear();
     if (keys.empty()) {
         return TxnErrorCode::TXN_OK;
     }
-    StopWatch sw;
-    std::vector<FDBFuture*> futures;
-    futures.reserve(keys.size());
-    for (const auto& k : keys) {
-        futures.push_back(fdb_transaction_get(txn_, (uint8_t*)k.data(), 
k.size(), opts.snapshot));
-    }
 
-    auto release_futures = [&futures, &sw](int*) {
-        std::for_each(futures.begin(), futures.end(),
-                      [](FDBFuture* fut) { fdb_future_destroy(fut); });
-        g_bvar_txn_kv_batch_get << sw.elapsed_us();
-    };
-    std::unique_ptr<int, decltype(release_futures)> defer((int*)0x01, 
std::move(release_futures));
+    StopWatch sw;
+    auto stop_watcher = [&sw](int*) { g_bvar_txn_kv_batch_get << 
sw.elapsed_us(); };
+    std::unique_ptr<int, decltype(stop_watcher)> defer((int*)0x01, 
std::move(stop_watcher));
 
+    size_t num_keys = keys.size();
     res->reserve(keys.size());
-    DCHECK(keys.size() == futures.size());
-    auto size = futures.size();
-    for (auto i = 0; i < size; ++i) {
-        const auto& fut = futures[i];
-        RETURN_IF_ERROR(await_future(fut));
-        auto err = fdb_future_get_error(fut);
-        if (err) {
-            LOG(WARNING) << __PRETTY_FUNCTION__
-                         << " failed to fdb_future_get_error err=" << 
fdb_get_error(err)
-                         << " key=" << hex(keys[i]);
-            return cast_as_txn_code(err);
+    std::vector<std::unique_ptr<FDBFuture, FDBFutureDelete>> futures;
+    futures.reserve(opts.concurrency);
+    for (size_t i = 0; i < num_keys; i += opts.concurrency) {
+        size_t size = std::min(i + opts.concurrency, num_keys);
+        for (size_t j = i; j < size; j++) {
+            const auto& k = keys[j];
+            futures.emplace_back(
+                    fdb_transaction_get(txn_, (uint8_t*)k.data(), k.size(), 
opts.snapshot));
         }
-        fdb_bool_t found;
-        const uint8_t* ret;
-        int len;
-        err = fdb_future_get_value(fut, &found, &ret, &len);
-
-        if (err) {
-            LOG(WARNING) << __PRETTY_FUNCTION__
-                         << " failed to fdb_future_get_value err=" << 
fdb_get_error(err)
-                         << " key=" << hex(keys[i]);
-            return cast_as_txn_code(err);
-        }
-        if (!found) {
-            res->push_back(std::nullopt);
-            continue;
+
+        size_t num_futures = futures.size();
+        for (auto j = 0; j < num_futures; j++) {
+            FDBFuture* future = futures[j].get();
+            std::string_view key = keys[i + j];
+            RETURN_IF_ERROR(await_future(future));
+            fdb_error_t err = fdb_future_get_error(future);
+            if (err) {
+                LOG(WARNING) << __PRETTY_FUNCTION__
+                             << " failed to fdb_future_get_error err=" << 
fdb_get_error(err)
+                             << " key=" << hex(key);
+                return cast_as_txn_code(err);
+            }
+            fdb_bool_t found;
+            const uint8_t* ret;
+            int len;
+            err = fdb_future_get_value(future, &found, &ret, &len);
+            if (err) {
+                LOG(WARNING) << __PRETTY_FUNCTION__
+                             << " failed to fdb_future_get_value err=" << 
fdb_get_error(err)
+                             << " key=" << hex(key);
+                return cast_as_txn_code(err);
+            }
+            if (!found) {
+                res->push_back(std::nullopt);
+                continue;
+            }
+            res->push_back(std::string((char*)ret, len));
         }
-        res->push_back(std::string((char*)ret, len));
+        futures.clear();
     }
+    DCHECK_EQ(res->size(), num_keys);
     return TxnErrorCode::TXN_OK;
 }
 
diff --git a/cloud/src/meta-service/txn_kv.h b/cloud/src/meta-service/txn_kv.h
index 6cf225de3cc..12ee38823f9 100644
--- a/cloud/src/meta-service/txn_kv.h
+++ b/cloud/src/meta-service/txn_kv.h
@@ -147,10 +147,18 @@ public:
     virtual TxnErrorCode abort() = 0;
 
     struct BatchGetOptions {
-        BatchGetOptions() : snapshot(false) {};
+        BatchGetOptions() : BatchGetOptions(false) {};
+        BatchGetOptions(bool s) : snapshot(s), concurrency(1000) {};
+
+        // if true, `key` will not be included in txn conflict detection this 
time.
+        //
+        // Default: false
         bool snapshot;
-        // TODO: Avoid consuming too many resources in one batch
-        // int limit = 1000;
+
+        // the maximum number of concurrent requests submitted to fdb at one 
time.
+        //
+        // Default: 1000
+        int concurrency;
     };
     /**
      * @brief batch get keys


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to