This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-4.0-preview in repository https://gitbox.apache.org/repos/asf/doris.git
commit eec97ae352badb83e9fdaa7ea5fa285fe2ca5522 Author: walter <[email protected]> AuthorDate: Fri Apr 19 09:42:41 2024 +0800 [imporve](cloud) Get key values in batch during txn commit (#33771) --- cloud/src/meta-service/meta_service_txn.cpp | 237 ++++++++++++++++++---------- cloud/src/meta-service/txn_kv.cpp | 87 +++++----- cloud/src/meta-service/txn_kv.h | 14 +- 3 files changed, 208 insertions(+), 130 deletions(-) diff --git a/cloud/src/meta-service/meta_service_txn.cpp b/cloud/src/meta-service/meta_service_txn.cpp index 41a1e6c4dfe..139206ede20 100644 --- a/cloud/src/meta-service/meta_service_txn.cpp +++ b/cloud/src/meta-service/meta_service_txn.cpp @@ -48,19 +48,19 @@ static void get_pb_from_tablestats(TableStats& stats, TableStatsPB* stats_pb) { stats_pb->set_updated_row_count(stats.updated_row_count); } -static void calc_table_stats(std::map<int64_t, TabletIndexPB>& tablet_ids, - std::map<int64_t, TabletStats>& tablet_stats, +static void calc_table_stats(std::unordered_map<int64_t, TabletIndexPB>& tablet_ids, + std::unordered_map<int64_t, TabletStats>& tablet_stats, std::map<int64_t, TableStats>& table_stats, std::vector<int64_t> base_tablet_ids) { int64_t table_id; VLOG_DEBUG << "base_tablet_ids size: " << base_tablet_ids.size(); - for (auto& [tablet_id, tablet_stat] : tablet_stats) { - // if tablet_id not in base_tablet_ids then skip it - if (std::find(base_tablet_ids.begin(), base_tablet_ids.end(), tablet_id) == - base_tablet_ids.end()) { + for (int64_t tablet_id : base_tablet_ids) { + auto it = tablet_stats.find(tablet_id); + if (it == tablet_stats.end()) { continue; } + const auto& tablet_stat = it->second; table_id = tablet_ids[tablet_id].table_id(); if (table_stats.find(table_id) == table_stats.end()) { table_stats[table_id] = TableStats(tablet_stat.num_rows); @@ -689,6 +689,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = cast_as<ErrCategory::CREATE>(err); ss << "filed to create txn, txn_id=" << txn_id << " err=" << err; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -700,6 +701,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = cast_as<ErrCategory::READ>(err); ss << "failed to get db id, txn_id=" << txn_id << " err=" << err; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -708,6 +710,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = MetaServiceCode::PROTOBUF_PARSE_ERR; ss << "failed to parse txn_index_pb, txn_id=" << txn_id; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -737,12 +740,19 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, std::unique_ptr<RangeGetIterator> it; do { - err = txn->get(rs_tmp_key0, rs_tmp_key1, &it); + err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); + if (err == TxnErrorCode::TXN_TOO_OLD) { + err = txn_kv_->create_txn(&txn); + if (err == TxnErrorCode::TXN_OK) { + err = txn->get(rs_tmp_key0, rs_tmp_key1, &it, true); + } + } if (err != TxnErrorCode::TXN_OK) { code = cast_as<ErrCategory::READ>(err); ss << "internal error, failed to get tmp rowset while committing, txn_id=" << txn_id << " err=" << err; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -752,10 +762,10 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, tmp_rowsets_meta.emplace_back(); if (!tmp_rowsets_meta.back().second.ParseFromArray(v.data(), v.size())) { code = MetaServiceCode::PROTOBUF_PARSE_ERR; - ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id; + ss << "malformed rowset meta, unable to initialize, txn_id=" << txn_id + << " key=" << hex(k); msg = ss.str(); - ss << " key=" << hex(k); - LOG(WARNING) << ss.str(); + LOG(WARNING) << msg; return; } // Save keys that will be removed later @@ -775,6 +785,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = cast_as<ErrCategory::CREATE>(err); ss << "filed to create txn, txn_id=" << txn_id << " err=" << err; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -791,6 +802,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, : cast_as<ErrCategory::READ>(err); ss << "failed to get txn_info, db_id=" << db_id << " txn_id=" << txn_id << " err=" << err; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -799,6 +811,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = MetaServiceCode::PROTOBUF_PARSE_ERR; ss << "failed to parse txn_info, db_id=" << db_id << " txn_id=" << txn_id; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -808,6 +821,7 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = MetaServiceCode::TXN_ALREADY_ABORTED; ss << "transaction is already aborted: db_id=" << db_id << " txn_id=" << txn_id; msg = ss.str(); + LOG(WARNING) << msg; return; } @@ -830,86 +844,123 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = MetaServiceCode::TXN_INVALID_STATUS; ss << "transaction is prepare, not pre-committed: db_id=" << db_id << " txn_id" << txn_id; msg = ss.str(); + LOG(WARNING) << msg; return; } LOG(INFO) << "txn_id=" << txn_id << " txn_info=" << txn_info.ShortDebugString(); // Prepare rowset meta and new_versions - std::vector<std::pair<std::string, std::string>> rowsets; - std::map<std::string, uint64_t> new_versions; - std::map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats - std::map<int64_t, TabletIndexPB> tablet_ids; // tablet_id -> {table/index/partition}_id - std::map<int64_t, std::vector<int64_t>> table_id_tablet_ids; // table_id -> tablets_ids - rowsets.reserve(tmp_rowsets_meta.size()); + // Read tablet indexes in batch. + std::vector<std::string> tablet_idx_keys; for (auto& [_, i] : tmp_rowsets_meta) { - int64_t tablet_id = i.tablet_id(); - // Get version for the rowset - if (tablet_ids.count(tablet_id) == 0) { - MetaTabletIdxKeyInfo key_info {instance_id, tablet_id}; - auto [key, val] = std::make_tuple(std::string(""), std::string("")); - meta_tablet_idx_key(key_info, &key); - TxnErrorCode err = txn->get(key, &val); - if (err != TxnErrorCode::TXN_OK) { // Must be TXN_OK, an existing value - code = cast_as<ErrCategory::READ>(err); - ss << "failed to get tablet table index ids," - << (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " not found" : " internal error") - << " tablet_id=" << tablet_id << " key=" << hex(key); - msg = ss.str(); - LOG(INFO) << msg << " err=" << err << " txn_id=" << txn_id; - return; - } - if (!tablet_ids[tablet_id].ParseFromString(val)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - ss << "malformed tablet index value tablet_id=" << tablet_id - << " txn_id=" << txn_id; - msg = ss.str(); - return; - } - table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id); - VLOG_DEBUG << "tablet_id:" << tablet_id - << " value:" << tablet_ids[tablet_id].ShortDebugString(); + tablet_idx_keys.push_back(meta_tablet_idx_key({instance_id, i.tablet_id()})); + } + std::vector<std::optional<std::string>> tablet_idx_values; + err = txn->batch_get(&tablet_idx_values, tablet_idx_keys, Transaction::BatchGetOptions(false)); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get tablet table index ids, err=" << err; + msg = ss.str(); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + + size_t total_rowsets = tmp_rowsets_meta.size(); + // tablet_id -> {table/index/partition}_id + std::unordered_map<int64_t, TabletIndexPB> tablet_ids; + // table_id -> tablets_ids + std::unordered_map<int64_t, std::vector<int64_t>> table_id_tablet_ids; + for (size_t i = 0; i < total_rowsets; i++) { + uint64_t tablet_id = tmp_rowsets_meta[i].second.tablet_id(); + if (!tablet_idx_values[i].has_value()) [[unlikely]] { + // The value must existed + code = MetaServiceCode::KV_TXN_GET_ERR; + ss << "failed to get tablet table index ids, err=not found" + << " tablet_id=" << tablet_id << " key=" << hex(tablet_idx_keys[i]); + msg = ss.str(); + LOG(WARNING) << msg << " err=" << err << " txn_id=" << txn_id; + return; + } + if (!tablet_ids[tablet_id].ParseFromString(tablet_idx_values[i].value())) [[unlikely]] { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "malformed tablet index value tablet_id=" << tablet_id << " txn_id=" << txn_id; + msg = ss.str(); + LOG(WARNING) << msg; + return; } + table_id_tablet_ids[tablet_ids[tablet_id].table_id()].push_back(tablet_id); + VLOG_DEBUG << "tablet_id:" << tablet_id + << " value:" << tablet_ids[tablet_id].ShortDebugString(); + } + + tablet_idx_keys.clear(); + tablet_idx_values.clear(); + // {table/partition} -> version + std::unordered_map<std::string, uint64_t> new_versions; + std::vector<std::string> version_keys; + for (auto& [_, i] : tmp_rowsets_meta) { + int64_t tablet_id = i.tablet_id(); int64_t table_id = tablet_ids[tablet_id].table_id(); int64_t partition_id = i.partition_id(); - std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); - int64_t version = -1; - std::string ver_val_str; - int64_t new_version = -1; - VersionPB version_pb; if (new_versions.count(ver_key) == 0) { - err = txn->get(ver_key, &ver_val_str); - if (err != TxnErrorCode::TXN_OK && err != TxnErrorCode::TXN_KEY_NOT_FOUND) { - code = cast_as<ErrCategory::READ>(err); - ss << "failed to get version, table_id=" << table_id - << "partition_id=" << partition_id << " key=" << hex(ver_key); + new_versions.insert({ver_key, 0}); + version_keys.push_back(std::move(ver_key)); + } + } + std::vector<std::optional<std::string>> version_values; + err = txn->batch_get(&version_values, version_keys); + if (err != TxnErrorCode::TXN_OK) { + code = cast_as<ErrCategory::READ>(err); + ss << "failed to get partition versions, err=" << err; + msg = ss.str(); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + size_t total_versions = version_keys.size(); + for (size_t i = 0; i < total_versions; i++) { + int64_t version; + if (version_values[i].has_value()) { + VersionPB version_pb; + if (!version_pb.ParseFromString(version_values[i].value())) { + code = MetaServiceCode::PROTOBUF_PARSE_ERR; + ss << "failed to parse version pb txn_id=" << txn_id + << " key=" << hex(version_keys[i]); msg = ss.str(); - LOG(INFO) << msg << " txn_id=" << txn_id; return; } - - if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - // Maybe first version - version = 1; - } else { - if (!version_pb.ParseFromString(ver_val_str)) { - code = MetaServiceCode::PROTOBUF_PARSE_ERR; - ss << "failed to parse ver_val_str" - << " txn_id=" << txn_id << " key=" << hex(ver_key); - msg = ss.str(); - return; - } - version = version_pb.version(); - } - new_version = version + 1; - new_versions.insert({std::move(ver_key), new_version}); + version = version_pb.version(); } else { - new_version = new_versions[ver_key]; + version = 1; + } + new_versions[version_keys[i]] = version + 1; + } + version_keys.clear(); + version_values.clear(); + + std::vector<std::pair<std::string, std::string>> rowsets; + std::unordered_map<int64_t, TabletStats> tablet_stats; // tablet_id -> stats + rowsets.reserve(tmp_rowsets_meta.size()); + for (auto& [_, i] : tmp_rowsets_meta) { + int64_t tablet_id = i.tablet_id(); + int64_t table_id = tablet_ids[tablet_id].table_id(); + int64_t partition_id = i.partition_id(); + std::string ver_key = partition_version_key({instance_id, db_id, table_id, partition_id}); + if (new_versions[ver_key] == 0) [[unlikely]] { + // it is impossible. + code = MetaServiceCode::UNDEFINED_ERR; + ss << "failed to get partition version key, the target version not exists in " + "new_versions." + << " txn_id=" << txn_id; + msg = ss.str(); + LOG(ERROR) << msg; + return; } // Update rowset version + int64_t new_version = new_versions[ver_key]; i.set_start_version(new_version); i.set_end_version(new_version); @@ -932,30 +983,40 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, } // for tmp_rowsets_meta // process mow table, check lock and remove pending key + std::vector<std::string> lock_keys; + lock_keys.reserve(request->mow_table_ids().size()); for (auto table_id : request->mow_table_ids()) { - std::string lock_key = meta_delete_bitmap_update_lock_key({instance_id, table_id, -1}); - std::string lock_val; - err = txn->get(lock_key, &lock_val); - LOG(INFO) << "get delete bitmap update lock info, table_id=" << table_id - << " key=" << hex(lock_key) << " err=" << err; + lock_keys.push_back(meta_delete_bitmap_update_lock_key({instance_id, table_id, -1})); + } + std::vector<std::optional<std::string>> lock_values; + err = txn->batch_get(&lock_values, lock_keys); + if (err != TxnErrorCode::TXN_OK) { + ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id + << " err=" << err; + msg = ss.str(); + code = cast_as<ErrCategory::READ>(err); + LOG(WARNING) << msg << " txn_id=" << txn_id; + return; + } + size_t total_locks = lock_keys.size(); + for (size_t i = 0; i < total_locks; i++) { + int64_t table_id = request->mow_table_ids(i); // When the key does not exist, it means the lock has been acquired // by another transaction and successfully committed. - if (err == TxnErrorCode::TXN_KEY_NOT_FOUND) { - msg = "lock is expired"; + if (!lock_values[i].has_value()) { + ss << "get delete bitmap update lock info, lock is expired" + << " table_id=" << table_id << " key=" << hex(lock_keys[i]); code = MetaServiceCode::LOCK_EXPIRED; - return; - } - if (err != TxnErrorCode::TXN_OK) { - ss << "failed to get delete bitmap update lock key info, instance_id=" << instance_id - << " table_id=" << table_id << " key=" << hex(lock_key) << " err=" << err; msg = ss.str(); - code = cast_as<ErrCategory::READ>(err); + LOG(WARNING) << msg << " txn_id=" << txn_id; return; } + DeleteBitmapUpdateLockPB lock_info; - if (!lock_info.ParseFromString(lock_val)) [[unlikely]] { + if (!lock_info.ParseFromString(lock_values[i].value())) [[unlikely]] { code = MetaServiceCode::PROTOBUF_PARSE_ERR; msg = "failed to parse DeleteBitmapUpdateLockPB"; + LOG(WARNING) << msg << " txn_id=" << txn_id; return; } if (lock_info.lock_id() != request->txn_id()) { @@ -963,8 +1024,8 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, code = MetaServiceCode::LOCK_EXPIRED; return; } - txn->remove(lock_key); - LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_key) + txn->remove(lock_keys[i]); + LOG(INFO) << "xxx remove delete bitmap lock, lock_key=" << hex(lock_keys[i]) << " txn_id=" << txn_id; for (auto tablet_id : table_id_tablet_ids[table_id]) { @@ -974,6 +1035,8 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, << " txn_id=" << txn_id; } } + lock_keys.clear(); + lock_values.clear(); // Save rowset meta num_put_keys += rowsets.size(); @@ -1004,8 +1067,8 @@ void MetaServiceImpl::commit_txn(::google::protobuf::RpcController* controller, << " txn_id=" << txn_id; std::string_view ver_key = i.first; - //PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} ver_key.remove_prefix(1); // Remove key space + // PartitionVersionKeyInfo {instance_id, db_id, table_id, partition_id} std::vector<std::tuple<std::variant<int64_t, std::string>, int, int>> out; int ret = decode_key(&ver_key, &out); if (ret != 0) [[unlikely]] { diff --git a/cloud/src/meta-service/txn_kv.cpp b/cloud/src/meta-service/txn_kv.cpp index 54cec231f6f..5afe0edb409 100644 --- a/cloud/src/meta-service/txn_kv.cpp +++ b/cloud/src/meta-service/txn_kv.cpp @@ -23,10 +23,8 @@ #include <algorithm> #include <atomic> #include <cstring> -#include <iomanip> #include <memory> #include <optional> -#include <sstream> #include <string_view> #include <thread> #include <vector> @@ -491,53 +489,62 @@ TxnErrorCode RangeGetIterator::init() { TxnErrorCode Transaction::batch_get(std::vector<std::optional<std::string>>* res, const std::vector<std::string>& keys, const BatchGetOptions& opts) { + struct FDBFutureDelete { + void operator()(FDBFuture* future) { fdb_future_destroy(future); } + }; + + res->clear(); if (keys.empty()) { return TxnErrorCode::TXN_OK; } - StopWatch sw; - std::vector<FDBFuture*> futures; - futures.reserve(keys.size()); - for (const auto& k : keys) { - futures.push_back(fdb_transaction_get(txn_, (uint8_t*)k.data(), k.size(), opts.snapshot)); - } - auto release_futures = [&futures, &sw](int*) { - std::for_each(futures.begin(), futures.end(), - [](FDBFuture* fut) { fdb_future_destroy(fut); }); - g_bvar_txn_kv_batch_get << sw.elapsed_us(); - }; - std::unique_ptr<int, decltype(release_futures)> defer((int*)0x01, std::move(release_futures)); + StopWatch sw; + auto stop_watcher = [&sw](int*) { g_bvar_txn_kv_batch_get << sw.elapsed_us(); }; + std::unique_ptr<int, decltype(stop_watcher)> defer((int*)0x01, std::move(stop_watcher)); + size_t num_keys = keys.size(); res->reserve(keys.size()); - DCHECK(keys.size() == futures.size()); - auto size = futures.size(); - for (auto i = 0; i < size; ++i) { - const auto& fut = futures[i]; - RETURN_IF_ERROR(await_future(fut)); - auto err = fdb_future_get_error(fut); - if (err) { - LOG(WARNING) << __PRETTY_FUNCTION__ - << " failed to fdb_future_get_error err=" << fdb_get_error(err) - << " key=" << hex(keys[i]); - return cast_as_txn_code(err); + std::vector<std::unique_ptr<FDBFuture, FDBFutureDelete>> futures; + futures.reserve(opts.concurrency); + for (size_t i = 0; i < num_keys; i += opts.concurrency) { + size_t size = std::min(i + opts.concurrency, num_keys); + for (size_t j = i; j < size; j++) { + const auto& k = keys[j]; + futures.emplace_back( + fdb_transaction_get(txn_, (uint8_t*)k.data(), k.size(), opts.snapshot)); } - fdb_bool_t found; - const uint8_t* ret; - int len; - err = fdb_future_get_value(fut, &found, &ret, &len); - - if (err) { - LOG(WARNING) << __PRETTY_FUNCTION__ - << " failed to fdb_future_get_value err=" << fdb_get_error(err) - << " key=" << hex(keys[i]); - return cast_as_txn_code(err); - } - if (!found) { - res->push_back(std::nullopt); - continue; + + size_t num_futures = futures.size(); + for (auto j = 0; j < num_futures; j++) { + FDBFuture* future = futures[j].get(); + std::string_view key = keys[i + j]; + RETURN_IF_ERROR(await_future(future)); + fdb_error_t err = fdb_future_get_error(future); + if (err) { + LOG(WARNING) << __PRETTY_FUNCTION__ + << " failed to fdb_future_get_error err=" << fdb_get_error(err) + << " key=" << hex(key); + return cast_as_txn_code(err); + } + fdb_bool_t found; + const uint8_t* ret; + int len; + err = fdb_future_get_value(future, &found, &ret, &len); + if (err) { + LOG(WARNING) << __PRETTY_FUNCTION__ + << " failed to fdb_future_get_value err=" << fdb_get_error(err) + << " key=" << hex(key); + return cast_as_txn_code(err); + } + if (!found) { + res->push_back(std::nullopt); + continue; + } + res->push_back(std::string((char*)ret, len)); } - res->push_back(std::string((char*)ret, len)); + futures.clear(); } + DCHECK_EQ(res->size(), num_keys); return TxnErrorCode::TXN_OK; } diff --git a/cloud/src/meta-service/txn_kv.h b/cloud/src/meta-service/txn_kv.h index 6cf225de3cc..12ee38823f9 100644 --- a/cloud/src/meta-service/txn_kv.h +++ b/cloud/src/meta-service/txn_kv.h @@ -147,10 +147,18 @@ public: virtual TxnErrorCode abort() = 0; struct BatchGetOptions { - BatchGetOptions() : snapshot(false) {}; + BatchGetOptions() : BatchGetOptions(false) {}; + BatchGetOptions(bool s) : snapshot(s), concurrency(1000) {}; + + // if true, `key` will not be included in txn conflict detection this time. + // + // Default: false bool snapshot; - // TODO: Avoid consuming too many resources in one batch - // int limit = 1000; + + // the maximum number of concurrent requests submitted to fdb at one time. + // + // Default: 1000 + int concurrency; }; /** * @brief batch get keys --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
